add karmada operator init workflow

Signed-off-by: calvin <wen.chen@daocloud.io>
This commit is contained in:
calvin 2023-01-06 18:45:54 +08:00
parent c9886a01a8
commit ec53c2a5b1
41 changed files with 4508 additions and 13 deletions

View File

@ -120,6 +120,7 @@ func init() {
func startKarmadaController(ctx ctrlctx.Context) (bool, error) { func startKarmadaController(ctx ctrlctx.Context) (bool, error) {
ctrl := &karmada.Controller{ ctrl := &karmada.Controller{
Config: ctx.Manager.GetConfig(),
Client: ctx.Manager.GetClient(), Client: ctx.Manager.GetClient(),
EventRecorder: ctx.Manager.GetEventRecorderFor(karmada.ControllerName), EventRecorder: ctx.Manager.GetEventRecorderFor(karmada.ControllerName),
} }

View File

@ -656,6 +656,10 @@ spec:
description: ServiceSubnet is the subnet used by k8s services. description: ServiceSubnet is the subnet used by k8s services.
Defaults to "10.96.0.0/12". Defaults to "10.96.0.0/12".
type: string type: string
serviceType:
description: ServiceType represents the service type of karmada
apiserver. it is Nodeport by default.
type: string
type: object type: object
karmadaAggregratedAPIServer: karmadaAggregratedAPIServer:
description: KarmadaAggregratedAPIServer holds settings to karmada-aggregated-apiserver description: KarmadaAggregratedAPIServer holds settings to karmada-aggregated-apiserver
@ -774,10 +778,6 @@ spec:
More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/'
type: object type: object
type: object type: object
serviceType:
description: ServiceType represents the service type of karmada
apiserver. it is Nodeport by default.
type: string
type: object type: object
karmadaControllerManager: karmadaControllerManager:
description: KarmadaControllerManager holds settings to karmada-controller-manager description: KarmadaControllerManager holds settings to karmada-controller-manager

View File

@ -0,0 +1,42 @@
apiVersion: operator.karmada.io/v1alpha1
kind: Karmada
metadata:
name: karmada-demo
namespace: test
spec:
components:
etcd:
local:
imageRepository: registry.k8s.io/etcd
imageTag: 3.5.3-0
volumeData:
emptyDir: {}
karmadaAPIServer:
imageRepository: registry.k8s.io/kube-apiserver
imageTag: v1.25.2
replicas: 1
serviceType: NodePort
serviceSubnet: 10.96.0.0/12
karmadaAggregratedAPIServer:
imageRepository: docker.io/karmada/karmada-aggregated-apiserver
imageTag: v1.4.0
replicas: 1
karmadaControllerManager:
imageRepository: docker.io/karmada/karmada-controller-manager
imageTag: v1.4.0
replicas: 1
karmadaScheduler:
imageRepository: docker.io/karmada/karmada-scheduler
imageTag: v1.4.0
replicas: 1
karmadaWebhook:
imageRepository: docker.io/karmada/karmada-webhook
imageTag: v1.4.0
replicas: 1
kubeControllerManager:
imageRepository: registry.k8s.io/kube-controller-manager
imageTag: v1.25.2
replicas: 1
hostCluster:
networking:
dnsDomain: cluster.local

View File

@ -0,0 +1,10 @@
package v1alpha1
import (
"fmt"
)
// Name returns the image name.
func (image Image) Name() string {
return fmt.Sprintf("%s:%s", image.ImageRepository, image.ImageTag)
}

View File

@ -219,6 +219,11 @@ type KarmadaAPIServer struct {
// +optional // +optional
ServiceSubnet *string `json:"serviceSubnet,omitempty"` ServiceSubnet *string `json:"serviceSubnet,omitempty"`
// ServiceType represents the service type of karmada apiserver.
// it is Nodeport by default.
// +optional
ServiceType corev1.ServiceType `json:"serviceType,omitempty"`
// ExtraArgs is an extra set of flags to pass to the kube-apiserver component or // ExtraArgs is an extra set of flags to pass to the kube-apiserver component or
// override. A key in this map is the flag name as it appears on the command line except // override. A key in this map is the flag name as it appears on the command line except
// without leading dash(es). // without leading dash(es).
@ -270,11 +275,6 @@ type KarmadaAggregratedAPIServer struct {
// +optional // +optional
CertSANs []string `json:"certSANs,omitempty"` CertSANs []string `json:"certSANs,omitempty"`
// ServiceType represents the service type of karmada apiserver.
// it is Nodeport by default.
// +optional
ServiceType corev1.ServiceType `json:"serviceType,omitempty"`
// FeatureGates enabled by the user. // FeatureGates enabled by the user.
// - CustomizedClusterResourceModeling: https://karmada.io/docs/userguide/scheduling/cluster-resources#start-to-use-cluster-resource-models // - CustomizedClusterResourceModeling: https://karmada.io/docs/userguide/scheduling/cluster-resources#start-to-use-cluster-resource-models
// More info: https://github.com/karmada-io/karmada/blob/master/pkg/features/features.go // More info: https://github.com/karmada-io/karmada/blob/master/pkg/features/features.go

456
operator/pkg/certs/certs.go Normal file
View File

@ -0,0 +1,456 @@
package certs
import (
"crypto"
"crypto/ecdsa"
"crypto/elliptic"
cryptorand "crypto/rand"
"crypto/rsa"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"errors"
"fmt"
"math"
"math/big"
"net"
"time"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation"
certutil "k8s.io/client-go/util/cert"
"k8s.io/client-go/util/keyutil"
netutils "k8s.io/utils/net"
operatorv1alpha1 "github.com/karmada-io/karmada/operator/pkg/apis/operator/v1alpha1"
"github.com/karmada-io/karmada/operator/pkg/constants"
"github.com/karmada-io/karmada/operator/pkg/util"
)
const (
// CertificateBlockType is a possible value for pem.Block.Type.
CertificateBlockType = "CERTIFICATE"
rsaKeySize = 2048
keyExtension = ".key"
certExtension = ".crt"
)
// AltNamesMutatorConfig is a config to AltNamesMutator. It includes necessary
// configs to AltNamesMutator.
type AltNamesMutatorConfig struct {
Name string
Namespace string
Components *operatorv1alpha1.KarmadaComponents
}
type altNamesMutatorFunc func(*AltNamesMutatorConfig, *CertConfig) error
// CertConfig represents a config to generate certificate by karmada.
type CertConfig struct {
Name string
CAName string
NotAfter *time.Time
PublicKeyAlgorithm x509.PublicKeyAlgorithm // TODO: All public key of karmada cert use the RSA algorithm by default
Config certutil.Config
AltNamesMutatorFunc altNamesMutatorFunc
}
func (config *CertConfig) defaultPublicKeyAlgorithm() {
if config.PublicKeyAlgorithm == x509.UnknownPublicKeyAlgorithm {
config.PublicKeyAlgorithm = x509.RSA
}
}
func (config *CertConfig) defaultNotAfter() {
if config.NotAfter == nil {
notAfter := time.Now().Add(constants.CertificateValidity).UTC()
config.NotAfter = &notAfter
}
}
// GetDefaultCertList returns all of karmada certConfigs, it include karmada, front and etcd.
func GetDefaultCertList() []*CertConfig {
return []*CertConfig{
// karmada cert config.
KarmadaCertRootCA(),
KarmadaCertAdmin(),
KarmadaCertApiserver(),
// front proxy cert config.
KarmadaCertFrontProxyCA(),
KarmadaCertFrontProxyClient(),
// ETCD cert config.
KarmadaCertEtcdCA(),
KarmadaCertEtcdServer(),
KarmadaCertEtcdClient(),
}
}
// KarmadaCertRootCA returns karmada ca cert config.
func KarmadaCertRootCA() *CertConfig {
return &CertConfig{
Name: constants.CaCertAndKeyName,
Config: certutil.Config{
CommonName: "karmada",
},
}
}
// KarmadaCertAdmin returns karmada client cert config.
func KarmadaCertAdmin() *CertConfig {
return &CertConfig{
Name: constants.KarmadaCertAndKeyName,
CAName: constants.CaCertAndKeyName,
Config: certutil.Config{
CommonName: "system:admin",
Organization: []string{"system:masters"},
Usages: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth},
},
AltNamesMutatorFunc: makeAltNamesMutator(apiServerAltNamesMutator),
}
}
// KarmadaCertApiserver returns karmada apiserver cert config.
func KarmadaCertApiserver() *CertConfig {
return &CertConfig{
Name: constants.ApiserverCertAndKeyName,
CAName: constants.CaCertAndKeyName,
Config: certutil.Config{
CommonName: "karmada-apiserver",
Usages: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
},
AltNamesMutatorFunc: makeAltNamesMutator(apiServerAltNamesMutator),
}
}
// KarmadaCertFrontProxyCA returns karmada front proxy cert config.
func KarmadaCertFrontProxyCA() *CertConfig {
return &CertConfig{
Name: constants.FrontProxyCaCertAndKeyName,
Config: certutil.Config{
CommonName: "front-proxy-ca",
},
}
}
// KarmadaCertFrontProxyClient returns karmada front proxy client cert config.
func KarmadaCertFrontProxyClient() *CertConfig {
return &CertConfig{
Name: constants.FrontProxyClientCertAndKeyName,
CAName: constants.FrontProxyCaCertAndKeyName,
Config: certutil.Config{
CommonName: "front-proxy-client",
Usages: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth},
},
}
}
// KarmadaCertEtcdCA returns karmada front proxy client cert config.
func KarmadaCertEtcdCA() *CertConfig {
return &CertConfig{
Name: constants.EtcdCaCertAndKeyName,
Config: certutil.Config{
CommonName: "karmada-etcd-ca",
},
}
}
// KarmadaCertEtcdServer returns etcd server cert config.
func KarmadaCertEtcdServer() *CertConfig {
return &CertConfig{
Name: constants.EtcdServerCertAndKeyName,
CAName: constants.EtcdCaCertAndKeyName,
Config: certutil.Config{
CommonName: "karmada-etcd-server",
Usages: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth},
},
AltNamesMutatorFunc: makeAltNamesMutator(etcdServerAltNamesMutator),
}
}
// KarmadaCertEtcdClient returns etcd client cert config.
func KarmadaCertEtcdClient() *CertConfig {
return &CertConfig{
Name: constants.EtcdClientCertAndKeyName,
CAName: constants.EtcdCaCertAndKeyName,
Config: certutil.Config{
CommonName: "karmada-etcd-client",
Usages: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth},
},
}
}
// KarmadaCert is karmada certificate, it includes certificate basic message.
// we can directly get the byte array of certificate key and cert from the object.
type KarmadaCert struct {
pairName string
caName string
cert []byte
key []byte
}
// CertData returns certificate cert data.
func (cert *KarmadaCert) CertData() []byte {
return cert.cert
}
// KeyData returns certificate key data.
func (cert *KarmadaCert) KeyData() []byte {
return cert.key
}
// CertName returns cert file name. its default suffix is ".crt".
func (cert *KarmadaCert) CertName() string {
pair := cert.pairName
if len(pair) == 0 {
pair = "cert"
}
return pair + certExtension
}
// KeyName returns cert key file name. its default suffix is ".key".
func (cert *KarmadaCert) KeyName() string {
pair := cert.pairName
if len(pair) == 0 {
pair = "cert"
}
return pair + keyExtension
}
// GeneratePrivateKey generates cert key with default size if 1024. it support
// ECDSA and RAS algorithm.
func GeneratePrivateKey(keyType x509.PublicKeyAlgorithm) (crypto.Signer, error) {
if keyType == x509.ECDSA {
return ecdsa.GenerateKey(elliptic.P256(), cryptorand.Reader)
}
return rsa.GenerateKey(cryptorand.Reader, rsaKeySize)
}
// NewCertificateAuthority creates new certificate and private key for the certificate authority
func NewCertificateAuthority(cc *CertConfig) (*KarmadaCert, error) {
cc.defaultPublicKeyAlgorithm()
key, err := GeneratePrivateKey(cc.PublicKeyAlgorithm)
if err != nil {
return nil, fmt.Errorf("unable to create private key while generating CA certificate, err: %w", err)
}
cert, err := certutil.NewSelfSignedCACert(cc.Config, key)
if err != nil {
return nil, fmt.Errorf("unable to create self-signed CA certificate, err: %w", err)
}
encoded, err := keyutil.MarshalPrivateKeyToPEM(key)
if err != nil {
return nil, fmt.Errorf("unable to marshal private key to PEM, err: %w", err)
}
return &KarmadaCert{
pairName: cc.Name,
caName: cc.CAName,
cert: EncodeCertPEM(cert),
key: encoded,
}, nil
}
// CreateCertAndKeyFilesWithCA loads the given certificate authority from disk, then generates and writes out the given certificate and key.
// The certSpec and caCertSpec should both be one of the variables from this package.
func CreateCertAndKeyFilesWithCA(cc *CertConfig, caCertData, caKeyData []byte) (*KarmadaCert, error) {
if len(cc.Config.Usages) == 0 {
return nil, fmt.Errorf("must specify at least one ExtKeyUsage")
}
cc.defaultNotAfter()
cc.defaultPublicKeyAlgorithm()
key, err := GeneratePrivateKey(cc.PublicKeyAlgorithm)
if err != nil {
return nil, fmt.Errorf("unable to create private key, err: %w", err)
}
caCerts, err := certutil.ParseCertsPEM(caCertData)
if err != nil {
return nil, err
}
caKey, err := ParsePrivateKeyPEM(caKeyData)
if err != nil {
return nil, err
}
// Safely pick the first one because the sender's certificate must come first in the list.
// For details, see: https://www.rfc-editor.org/rfc/rfc4346#section-7.4.2
caCert := caCerts[0]
cert, err := NewSignedCert(cc, key, caCert, caKey, false)
if err != nil {
return nil, err
}
encoded, err := keyutil.MarshalPrivateKeyToPEM(key)
if err != nil {
return nil, fmt.Errorf("unable to marshal private key to PEM, err: %w", err)
}
return &KarmadaCert{
pairName: cc.Name,
caName: cc.CAName,
cert: EncodeCertPEM(cert),
key: encoded,
}, nil
}
// NewSignedCert creates a signed certificate using the given CA certificate and key
func NewSignedCert(cc *CertConfig, key crypto.Signer, caCert *x509.Certificate, caKey crypto.Signer, isCA bool) (*x509.Certificate, error) {
serial, err := cryptorand.Int(cryptorand.Reader, new(big.Int).SetInt64(math.MaxInt64))
if err != nil {
return nil, err
}
if len(cc.Config.CommonName) == 0 {
return nil, fmt.Errorf("must specify a CommonName")
}
keyUsage := x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature
if isCA {
keyUsage |= x509.KeyUsageCertSign
}
RemoveDuplicateAltNames(&cc.Config.AltNames)
notAfter := time.Now().Add(constants.CertificateValidity).UTC()
if cc.NotAfter != nil {
notAfter = *cc.NotAfter
}
certTmpl := x509.Certificate{
Subject: pkix.Name{
CommonName: cc.Config.CommonName,
Organization: cc.Config.Organization,
},
DNSNames: cc.Config.AltNames.DNSNames,
IPAddresses: cc.Config.AltNames.IPs,
SerialNumber: serial,
NotBefore: caCert.NotBefore,
NotAfter: notAfter,
KeyUsage: keyUsage,
ExtKeyUsage: cc.Config.Usages,
BasicConstraintsValid: true,
IsCA: isCA,
}
certDERBytes, err := x509.CreateCertificate(cryptorand.Reader, &certTmpl, caCert, key.Public(), caKey)
if err != nil {
return nil, err
}
return x509.ParseCertificate(certDERBytes)
}
// RemoveDuplicateAltNames removes duplicate items in altNames.
func RemoveDuplicateAltNames(altNames *certutil.AltNames) {
if altNames == nil {
return
}
if altNames.DNSNames != nil {
altNames.DNSNames = sets.NewString(altNames.DNSNames...).List()
}
ipsKeys := make(map[string]struct{})
var ips []net.IP
for _, one := range altNames.IPs {
if _, ok := ipsKeys[one.String()]; !ok {
ipsKeys[one.String()] = struct{}{}
ips = append(ips, one)
}
}
altNames.IPs = ips
}
func appendSANsToAltNames(altNames *certutil.AltNames, SANs []string) {
for _, altname := range SANs {
if ip := netutils.ParseIPSloppy(altname); ip != nil {
altNames.IPs = append(altNames.IPs, ip)
} else if len(validation.IsDNS1123Subdomain(altname)) == 0 {
altNames.DNSNames = append(altNames.DNSNames, altname)
} else if len(validation.IsWildcardDNS1123Subdomain(altname)) == 0 {
altNames.DNSNames = append(altNames.DNSNames, altname)
}
}
}
// EncodeCertPEM returns PEM-endcoded certificate data
func EncodeCertPEM(cert *x509.Certificate) []byte {
block := pem.Block{
Type: CertificateBlockType,
Bytes: cert.Raw,
}
return pem.EncodeToMemory(&block)
}
// ParsePrivateKeyPEM parses crypto.Signer from byte array. the key
// must be encryption by ECDSA and RAS.
func ParsePrivateKeyPEM(keyData []byte) (crypto.Signer, error) {
caPrivateKey, err := keyutil.ParsePrivateKeyPEM(keyData)
if err != nil {
return nil, err
}
// Allow RSA and ECDSA formats only
var key crypto.Signer
switch k := caPrivateKey.(type) {
case *rsa.PrivateKey:
key = k
case *ecdsa.PrivateKey:
key = k
default:
return nil, errors.New("the private key is neither in RSA nor ECDSA format")
}
return key, nil
}
func makeAltNamesMutator(f func(cfg *AltNamesMutatorConfig) (*certutil.AltNames, error)) altNamesMutatorFunc {
return func(cfg *AltNamesMutatorConfig, cc *CertConfig) error {
altNames, err := f(cfg)
if err != nil {
return err
}
cc.Config.AltNames = *altNames
return nil
}
}
func etcdServerAltNamesMutator(cfg *AltNamesMutatorConfig) (*certutil.AltNames, error) {
etcdClientServiceDNS := fmt.Sprintf("%s.%s.svc.cluster.local", util.KarmadaEtcdClientName(cfg.Name), cfg.Namespace)
etcdPeerServiceDNS := fmt.Sprintf("*.%s.%s.svc.cluster.local", util.KarmadaEtcdName(cfg.Name), cfg.Namespace)
altNames := &certutil.AltNames{
DNSNames: []string{"localhost", etcdClientServiceDNS, etcdPeerServiceDNS},
IPs: []net.IP{net.IPv4(127, 0, 0, 1)},
}
if cfg.Components.Etcd.Local != nil {
appendSANsToAltNames(altNames, cfg.Components.Etcd.Local.ServerCertSANs)
}
return altNames, nil
}
func apiServerAltNamesMutator(cfg *AltNamesMutatorConfig) (*certutil.AltNames, error) {
altNames := &certutil.AltNames{
DNSNames: []string{
"localhost",
"kubernetes",
"kubernetes.default",
"kubernetes.default.svc",
fmt.Sprintf("*.%s.svc.cluster.local", cfg.Namespace),
fmt.Sprintf("*.%s.svc", cfg.Namespace),
},
IPs: []net.IP{net.IPv4(127, 0, 0, 1)},
}
if len(cfg.Components.KarmadaAPIServer.CertSANs) > 0 {
appendSANsToAltNames(altNames, cfg.Components.KarmadaAPIServer.CertSANs)
}
return altNames, nil
}

107
operator/pkg/certs/store.go Normal file
View File

@ -0,0 +1,107 @@
package certs
import (
"fmt"
"strings"
corev1 "k8s.io/api/core/v1"
)
// CertStore is an Interface that define the cert read and store operator to a cache.
// And we can load a set of certs form a k8s secret.
type CertStore interface {
AddCert(cert *KarmadaCert)
GetCert(name string) *KarmadaCert
CertList() []*KarmadaCert
LoadCertFormSercret(sercret *corev1.Secret) error
}
type splitToPairNameFunc func(name string) string
// SplitToPairName is default function to split cert pair name
// from a secret data key. It only works in this format:
// karmada.crt, karmada.key.
func SplitToPairName(name string) string {
if strings.Contains(name, keyExtension) {
strArr := strings.Split(name, keyExtension)
return strArr[0]
}
if strings.Contains(name, certExtension) {
strArr := strings.Split(name, certExtension)
return strArr[0]
}
return name
}
// KarmadaCertStore is a cache to store karmada certificate. the key is cert baseName by default.
type KarmadaCertStore struct {
certs map[string]*KarmadaCert
pairNameFunc splitToPairNameFunc
}
// NewCertStore returns a cert store. It use default SplitToPairName function to
// get cert pair name form cert file name.
func NewCertStore() CertStore {
return &KarmadaCertStore{
certs: make(map[string]*KarmadaCert),
pairNameFunc: SplitToPairName,
}
}
// AddCert adds a cert to cert store, the cache key is cert pairName by default.
func (store *KarmadaCertStore) AddCert(cert *KarmadaCert) {
store.certs[cert.pairName] = cert
}
// GetCert get cert from store by cert pairName.
func (store *KarmadaCertStore) GetCert(name string) *KarmadaCert {
for _, c := range store.certs {
if c.pairName == name {
return c
}
}
return nil
}
// CertList lists all of karmada certs in the cert chache.
func (store *KarmadaCertStore) CertList() []*KarmadaCert {
certs := make([]*KarmadaCert, 0, len(store.certs))
for _, c := range store.certs {
certs = append(certs, c)
}
return certs
}
// LoadCertFormSercret loads a set of certs form k8s secret resource. we get cert
// cache key by calling the pairNameFunc function. if the secret data key suffix is ".crt",
// it be considered cert data. if the suffix is ".key", it be considered cert key data.
func (store *KarmadaCertStore) LoadCertFormSercret(sercret *corev1.Secret) error {
if len(sercret.Data) == 0 {
return fmt.Errorf("cert data is empty")
}
for name, data := range sercret.Data {
pairName := store.pairNameFunc(name)
kc := store.GetCert(pairName)
if kc == nil {
kc = &KarmadaCert{
pairName: pairName,
}
}
if strings.Contains(name, certExtension) {
kc.cert = data
}
if strings.Contains(name, keyExtension) {
kc.key = data
}
store.AddCert(kc)
}
return nil
}

View File

@ -0,0 +1,79 @@
package constants
import "time"
const (
// KubeDefaultRepository defines the default of the k8s image repository
KubeDefaultRepository = "registry.k8s.io"
// KarmadaDefaultRepository defines the default of the karmada image repository
KarmadaDefaultRepository = "docker.io/karmada"
// EtcdDefaultVersion defines the default of the karmada etcd image tag
EtcdDefaultVersion = "3.5.3-0"
// KarmadaDefaultVersion defines the default of the karmada components image tag
KarmadaDefaultVersion = "v1.4.0"
// KubeDefaultVersion defines the default of the karmada apiserver and kubeControllerManager image tag
KubeDefaultVersion = "v1.25.2"
// Etcd defines the name of the built-in etcd cluster component
Etcd = "etcd"
// KarmadaAPIServer defines the name of the karmada-apiserver component
KarmadaAPIServer = "kube-apiserver"
// KarmadaAggregatedAPIServer defines the name of the karmada-aggregated-apiserver component
KarmadaAggregatedAPIServer = "karmada-aggregated-apiserver"
// KubeControllerManager defines the name of the kube-controller-manager component
KubeControllerManager = "kube-controller-manager"
// KarmadaControllerManager defines the name of the karmada-controller-manager component
KarmadaControllerManager = "karmada-controller-manager"
// KarmadaScheduler defines the name of the karmada-scheduler component
KarmadaScheduler = "karmada-scheduler"
// KarmadaWebhook defines the name of the karmada-webhook component
KarmadaWebhook = "karmada-webhook"
// KarmadaSystemNamespace defines the leader selection namespace for karmada components
KarmadaSystemNamespace = "karmada-system"
// KarmadaDataDir defines the karmada data dir
KarmadaDataDir = "/var/lib/karmada"
// EtcdListenClientPort defines the port etcd listen on for client traffic
EtcdListenClientPort = 2379
// EtcdMetricsPort is the port at which to obtain etcd metrics and health status
EtcdMetricsPort = 2381
// EtcdListenPeerPort defines the port etcd listen on for peer traffic
EtcdListenPeerPort = 2380
// KarmadaAPIserverListenClientPort defines the port karmada apiserver listen on for client traffic
KarmadaAPIserverListenClientPort = 5443
// CertificateValidity Certificate validity period
CertificateValidity = time.Hour * 24 * 365
// CaCertAndKeyName ca certificate key name
CaCertAndKeyName = "ca"
// EtcdCaCertAndKeyName etcd ca certificate key name
EtcdCaCertAndKeyName = "etcd-ca"
// EtcdServerCertAndKeyName etcd server certificate key name
EtcdServerCertAndKeyName = "etcd-server"
// EtcdClientCertAndKeyName etcd client certificate key name
EtcdClientCertAndKeyName = "etcd-client"
// KarmadaCertAndKeyName karmada certificate key name
KarmadaCertAndKeyName = "karmada"
// ApiserverCertAndKeyName karmada apiserver certificate key name
ApiserverCertAndKeyName = "apiserver"
// FrontProxyCaCertAndKeyName front-proxy-client certificate key name
FrontProxyCaCertAndKeyName = "front-proxy-ca"
// FrontProxyClientCertAndKeyName front-proxy-client certificate key name
FrontProxyClientCertAndKeyName = "front-proxy-client"
// ClusterName karmada cluster name
ClusterName = "karmada-apiserver"
// UserName karmada cluster user name
UserName = "karmada-admin"
// KarmadaAPIserverComponent defines the name of karmada apiserver component
KarmadaAPIserverComponent = "KarmadaAPIServer"
// KarmadaAggregratedAPIServerComponent defines the name of karmada aggregrated apiserver component
KarmadaAggregratedAPIServerComponent = "KarmadaAggregratedAAPIServer"
// KubeControllerManagerComponent defines the name of kube controller manager component
KubeControllerManagerComponent = "KubeControllerManager"
// KarmadaControllerManagerComponent defines the name of karmada controller manager component
KarmadaControllerManagerComponent = "KarmadaControllerManager"
// KarmadaSchedulerComponent defines the name of karmada scheduler component
KarmadaSchedulerComponent = "KarmadaScheduler"
)

View File

@ -5,6 +5,7 @@ import (
"time" "time"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/klog/v2" "k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime" controllerruntime "sigs.k8s.io/controller-runtime"
@ -25,7 +26,7 @@ const (
// Controller controls the Karmada resource. // Controller controls the Karmada resource.
type Controller struct { type Controller struct {
client.Client client.Client
Config *rest.Config
EventRecorder record.EventRecorder EventRecorder record.EventRecorder
} }
@ -82,9 +83,11 @@ func (ctrl *Controller) Reconcile(ctx context.Context, req controllerruntime.Req
klog.V(2).InfoS("Reconciling karmada", "name", req.Name) klog.V(2).InfoS("Reconciling karmada", "name", req.Name)
// do reconcile planner, err := NewPlannerFor(karmada, ctrl.Client, ctrl.Config)
if err != nil {
return controllerruntime.Result{}, nil return controllerruntime.Result{}, err
}
return planner.Execute()
} }
func (ctrl *Controller) deleteUnableGCResources(karmada *operatorv1alpha1.Karmada) error { func (ctrl *Controller) deleteUnableGCResources(karmada *operatorv1alpha1.Karmada) error {

View File

@ -0,0 +1,112 @@
package karmada
import (
"fmt"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
operator "github.com/karmada-io/karmada/operator/pkg"
operatorv1alpha1 "github.com/karmada-io/karmada/operator/pkg/apis/operator/v1alpha1"
workflow "github.com/karmada-io/karmada/operator/pkg/workflow"
)
// Action is a intention corresponding karmada resource modification
type Action string
const (
// InitAction represents init karmada instance
InitAction Action = "init"
// DeInitAction represents delete karmada instance
DeInitAction Action = "deInit"
)
// Planner represents a planner to build a job woflow and startup it.
// the karmada resource change and enqueue is correspond to a action.
// it will create different job workflow according to action.
type Planner struct {
action Action
client.Client
karmada *operatorv1alpha1.Karmada
job *workflow.Job
}
// NewPlannerFor creates planner, it will recognize the karmada resource action
// and create different job.
func NewPlannerFor(karmada *operatorv1alpha1.Karmada, c client.Client, config *rest.Config) (*Planner, error) {
var job *workflow.Job
action := recognizeActionFor(karmada)
switch action {
case InitAction:
opts := []operator.InitOpt{
WithKarmada(karmada),
WithKubeconfig(config),
}
options := operator.NewJobOptions(opts...)
job = operator.NewInitJob(options)
default:
return nil, fmt.Errorf("failed to recognize action for karmada %s", karmada.Name)
}
return &Planner{
karmada: karmada,
Client: c,
job: job,
action: action,
}, nil
}
func recognizeActionFor(karmada *operatorv1alpha1.Karmada) Action {
if !karmada.DeletionTimestamp.IsZero() {
return DeInitAction
}
// TODO: support more action.
return InitAction
}
// Execute starts a job workflow. if the workflow is error,
// TODO: the karmada resource will requeue and reconcile
func (p *Planner) Execute() (controllerruntime.Result, error) {
klog.InfoS("Start execute the workflow", "workflow", p.action, "karmada", klog.KObj(p.karmada))
if err := p.job.Run(); err != nil {
return controllerruntime.Result{Requeue: true}, err
}
klog.InfoS("Successfully executed the workflow", "workflow", p.action, "karmada", klog.KObj(p.karmada))
return controllerruntime.Result{}, nil
}
// WithKarmada returns a InitOpt function to initialize InitOptions with karmada resource
func WithKarmada(karmada *operatorv1alpha1.Karmada) operator.InitOpt {
return func(opt *operator.InitOptions) {
opt.Name = karmada.GetName()
opt.Namespace = karmada.GetNamespace()
opt.FeatureGates = karmada.Spec.FeatureGates
if karmada.Spec.PrivateRegistry != nil && len(karmada.Spec.PrivateRegistry.Registry) > 0 {
opt.PrivateRegistry = karmada.Spec.PrivateRegistry.Registry
}
if karmada.Spec.Components != nil {
opt.Components = karmada.Spec.Components
}
if karmada.Spec.HostCluster != nil {
opt.HostCluster = karmada.Spec.HostCluster
}
}
}
// WithKubeconfig returns a InitOpt function to set kubeconfig to InitOptions with rest config
func WithKubeconfig(config *rest.Config) operator.InitOpt {
return func(options *operator.InitOptions) {
options.Kubeconfig = config
}
}

View File

@ -0,0 +1,142 @@
package apiserver
import (
"fmt"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
kuberuntime "k8s.io/apimachinery/pkg/runtime"
clientset "k8s.io/client-go/kubernetes"
clientsetscheme "k8s.io/client-go/kubernetes/scheme"
operatorv1alpha1 "github.com/karmada-io/karmada/operator/pkg/apis/operator/v1alpha1"
"github.com/karmada-io/karmada/operator/pkg/constants"
"github.com/karmada-io/karmada/operator/pkg/util"
"github.com/karmada-io/karmada/operator/pkg/util/apiclient"
)
// EnsureKarmadaAPIServer creates karmada apiserver deployment and service resource
func EnsureKarmadaAPIServer(client clientset.Interface, cfg *operatorv1alpha1.KarmadaComponents, name, namespace string) error {
if err := installKarmadaAPIServer(client, cfg.KarmadaAPIServer, name, namespace); err != nil {
return err
}
return createKarmadaAPIServerService(client, cfg.KarmadaAPIServer, name, namespace)
}
// EnsureKarmadaAggregratedAPIServer creates karmada aggregated apiserver deployment and service resource
func EnsureKarmadaAggregratedAPIServer(client clientset.Interface, cfg *operatorv1alpha1.KarmadaComponents, name, namespace string) error {
if err := installKarmadaAggregratedAPIServer(client, cfg.KarmadaAggregratedAPIServer, name, namespace); err != nil {
return err
}
return createKarmadaAggregratedAPIServerService(client, name, namespace)
}
func installKarmadaAPIServer(client clientset.Interface, cfg *operatorv1alpha1.KarmadaAPIServer, name, namespace string) error {
apiserverDeploymentbytes, err := util.ParseTemplate(KarmadaApiserverDeployment, struct {
DeploymentName, Namespace, Image, EtcdClientService string
ServiceSubnet, KarmadaCertsSecret, EtcdCertsSecret string
Replicas *int32
EtcdListenClientPort int32
}{
DeploymentName: util.KarmadaAPIServerName(name),
Namespace: namespace,
Image: cfg.Image.Name(),
EtcdClientService: util.KarmadaEtcdClientName(name),
ServiceSubnet: *cfg.ServiceSubnet,
KarmadaCertsSecret: util.KarmadaCertSecretName(name),
EtcdCertsSecret: util.EtcdCertSecretName(name),
Replicas: cfg.Replicas,
EtcdListenClientPort: constants.EtcdListenClientPort,
})
if err != nil {
return fmt.Errorf("error when parsing karmadaApiserver deployment template: %w", err)
}
apiserverDeployment := &appsv1.Deployment{}
if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), apiserverDeploymentbytes, apiserverDeployment); err != nil {
return fmt.Errorf("error when decoding karmadaApiserver deployment: %w", err)
}
if err := apiclient.CreateOrUpdateDeployment(client, apiserverDeployment); err != nil {
return fmt.Errorf("error when creating deployment for %s, err: %w", apiserverDeployment.Name, err)
}
return nil
}
func createKarmadaAPIServerService(client clientset.Interface, cfg *operatorv1alpha1.KarmadaAPIServer, name, namespace string) error {
karmadaApiserverServiceBytes, err := util.ParseTemplate(KarmadaApiserverService, struct {
ServiceName, Namespace, ServiceType string
}{
ServiceName: util.KarmadaAPIServerName(name),
Namespace: namespace,
ServiceType: string(cfg.ServiceType),
})
if err != nil {
return fmt.Errorf("error when parsing karmadaApiserver serive template: %w", err)
}
karmadaApiserverService := &corev1.Service{}
if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), karmadaApiserverServiceBytes, karmadaApiserverService); err != nil {
return fmt.Errorf("error when decoding karmadaApiserver serive: %w", err)
}
if err := apiclient.CreateOrUpdateService(client, karmadaApiserverService); err != nil {
return fmt.Errorf("err when creating service for %s, err: %w", karmadaApiserverService.Name, err)
}
return nil
}
func installKarmadaAggregratedAPIServer(client clientset.Interface, cfg *operatorv1alpha1.KarmadaAggregratedAPIServer, name, namespace string) error {
aggregatedAPIServerDeploymentBytes, err := util.ParseTemplate(KarmadaAggregatedAPIServerDeployment, struct {
DeploymentName, Namespace, Image, EtcdClientService string
KubeconfigSecret, KarmadaCertsSecret, EtcdCertsSecret string
Replicas *int32
EtcdListenClientPort int32
}{
DeploymentName: util.KarmadaAggratedAPIServerName(name),
Namespace: namespace,
Image: cfg.Image.Name(),
EtcdClientService: util.KarmadaEtcdClientName(name),
KubeconfigSecret: util.AdminKubeconfigSercretName(name),
KarmadaCertsSecret: util.KarmadaCertSecretName(name),
EtcdCertsSecret: util.EtcdCertSecretName(name),
Replicas: cfg.Replicas,
EtcdListenClientPort: constants.EtcdListenClientPort,
})
if err != nil {
return fmt.Errorf("error when parsing karmadaAggregratedAPIServer deployment template: %w", err)
}
aggregratedAPIServerDeployment := &appsv1.Deployment{}
if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), aggregatedAPIServerDeploymentBytes, aggregratedAPIServerDeployment); err != nil {
return fmt.Errorf("err when decoding karmadaApiserver deployment: %w", err)
}
if err := apiclient.CreateOrUpdateDeployment(client, aggregratedAPIServerDeployment); err != nil {
return fmt.Errorf("error when creating deployment for %s, err: %w", aggregratedAPIServerDeployment.Name, err)
}
return nil
}
func createKarmadaAggregratedAPIServerService(client clientset.Interface, name, namespace string) error {
aggregatedAPIServerServiceBytes, err := util.ParseTemplate(KarmadaAggregatedAPIServerService, struct {
ServiceName, Namespace string
}{
ServiceName: util.KarmadaAggratedAPIServerName(name),
Namespace: namespace,
})
if err != nil {
return fmt.Errorf("error when parsing karmadaAggregratedAPIServer serive template: %w", err)
}
aggregatedAPIServerService := &corev1.Service{}
if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), aggregatedAPIServerServiceBytes, aggregatedAPIServerService); err != nil {
return fmt.Errorf("err when decoding karmadaAggregratedAPIServer serive: %w", err)
}
if err := apiclient.CreateOrUpdateService(client, aggregatedAPIServerService); err != nil {
return fmt.Errorf("err when creating service for %s, err: %w", aggregatedAPIServerService.Name, err)
}
return nil
}

View File

@ -0,0 +1,214 @@
package apiserver
const (
// KarmadaApiserverDeployment is karmada apiserver deployment manifest
KarmadaApiserverDeployment = `
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
karmada-app: kube-apiserver
app.kubernetes.io/managed-by: karmada-operator
name: {{ .DeploymentName }}
namespace: {{ .Namespace }}
spec:
replicas: {{ .Replicas }}
selector:
matchLabels:
karmada-app: kube-apiserver
template:
metadata:
labels:
karmada-app: kube-apiserver
spec:
automountServiceAccountToken: false
containers:
- name: kube-apiserver
image: {{ .Image }}
imagePullPolicy: IfNotPresent
command:
- kube-apiserver
- --allow-privileged=true
- --authorization-mode=Node,RBAC
- --client-ca-file=/etc/karmada/pki/ca.crt
- --disable-admission-plugins=StorageObjectInUseProtection,ServiceAccount
- --enable-admission-plugins=NodeRestriction
- --enable-bootstrap-token-auth=true
- --etcd-cafile=/etc/etcd/pki/etcd-ca.crt
- --etcd-certfile=/etc/etcd/pki/etcd-client.crt
- --etcd-keyfile=/etc/etcd/pki/etcd-client.key
- --etcd-servers=https://{{ .EtcdClientService }}.{{ .Namespace }}.svc.cluster.local:{{ .EtcdListenClientPort }}
- --bind-address=0.0.0.0
- --kubelet-client-certificate=/etc/karmada/pki/karmada.crt
- --kubelet-client-key=/etc/karmada/pki/karmada.key
- --kubelet-preferred-address-types=InternalIP,ExternalIP,Hostname
- --secure-port=5443
- --service-account-issuer=https://kubernetes.default.svc.cluster.local
- --service-account-key-file=/etc/karmada/pki/karmada.key
- --service-account-signing-key-file=/etc/karmada/pki/karmada.key
- --service-cluster-ip-range={{ .ServiceSubnet }}
- --proxy-client-cert-file=/etc/karmada/pki/front-proxy-client.crt
- --proxy-client-key-file=/etc/karmada/pki/front-proxy-client.key
- --requestheader-allowed-names=front-proxy-client
- --requestheader-client-ca-file=/etc/karmada/pki/front-proxy-ca.crt
- --requestheader-extra-headers-prefix=X-Remote-Extra-
- --requestheader-group-headers=X-Remote-Group
- --requestheader-username-headers=X-Remote-User
- --tls-cert-file=/etc/karmada/pki/apiserver.crt
- --tls-private-key-file=/etc/karmada/pki/apiserver.key
- --max-requests-inflight=1500
- --max-mutating-requests-inflight=500
- --v=4
livenessProbe:
failureThreshold: 8
httpGet:
path: /livez
port: 5443
scheme: HTTPS
initialDelaySeconds: 10
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 15
readinessProbe:
failureThreshold: 3
httpGet:
path: /readyz
port: 5443
scheme: HTTPS
initialDelaySeconds: 10
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 15
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: karmada-app
operator: In
values:
- karmada-apiserver
topologyKey: kubernetes.io/hostname
ports:
- containerPort: 5443
name: http
protocol: TCP
volumeMounts:
- mountPath: /etc/karmada/pki
name: apiserver-cert
readOnly: true
- mountPath: /etc/etcd/pki
name: etcd-cert
readOnly: true
priorityClassName: system-node-critical
volumes:
- name: apiserver-cert
secret:
secretName: {{ .KarmadaCertsSecret }}
- name: etcd-cert
secret:
secretName: {{ .EtcdCertsSecret }}
`
// KarmadaApiserverService is karmada apiserver service manifest
KarmadaApiserverService = `
apiVersion: v1
kind: Service
metadata:
labels:
karmada-app: karmada-apiserver
app.kubernetes.io/managed-by: karmada-operator
name: {{ .ServiceName }}
namespace: {{ .Namespace }}
spec:
ports:
- name: client
port: 5443
protocol: TCP
targetPort: 5443
selector:
karmada-app: kube-apiserver
type: {{ .ServiceType }}
`
// KarmadaAggregatedAPIServerDeployment is karmada aggreagated apiserver deployment manifest
KarmadaAggregatedAPIServerDeployment = `
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
karmada-app: karmada-aggregated-apiserver
app.kubernetes.io/managed-by: karmada-operator
name: {{ .DeploymentName }}
namespace: {{ .Namespace }}
spec:
replicas: {{ .Replicas }}
selector:
matchLabels:
karmada-app: karmada-aggregated-apiserver
template:
metadata:
labels:
karmada-app: karmada-aggregated-apiserver
spec:
automountServiceAccountToken: false
containers:
- name: karmada-aggregated-apiserver
image: {{ .Image }}
imagePullPolicy: IfNotPresent
command:
- /bin/karmada-aggregated-apiserver
- --kubeconfig=/etc/karmada/config
- --authentication-kubeconfig=/etc/karmada/config
- --authorization-kubeconfig=/etc/karmada/config
- --etcd-cafile=/etc/etcd/pki/etcd-ca.crt
- --etcd-certfile=/etc/etcd/pki/etcd-client.crt
- --etcd-keyfile=/etc/etcd/pki/etcd-client.key
- --etcd-servers=https://{{ .EtcdClientService }}.{{ .Namespace }}.svc.cluster.local:{{ .EtcdListenClientPort }}
- --tls-cert-file=/etc/karmada/pki/karmada.crt
- --tls-private-key-file=/etc/karmada/pki/karmada.key
- --audit-log-path=-
- --feature-gates=APIPriorityAndFairness=false
- --audit-log-maxage=0
- --audit-log-maxbackup=0
volumeMounts:
- mountPath: /etc/karmada/config
name: kubeconfig
subPath: config
- mountPath: /etc/etcd/pki
name: etcd-cert
readOnly: true
- mountPath: /etc/karmada/pki
name: apiserver-cert
readOnly: true
volumes:
- name: kubeconfig
secret:
secretName: {{ .KubeconfigSecret }}
- name: apiserver-cert
secret:
secretName: {{ .KarmadaCertsSecret }}
- name: etcd-cert
secret:
secretName: {{ .EtcdCertsSecret }}
`
// KarmadaAggregatedAPIServerService is karmada aggregated APIServer Service manifest
KarmadaAggregatedAPIServerService = `
apiVersion: v1
kind: Service
metadata:
labels:
karmada-app: karmada-aggregated-apiserver
app.kubernetes.io/managed-by: karmada-operator
name: {{ .ServiceName }}
namespace: {{ .Namespace }}
spec:
ports:
- port: 443
protocol: TCP
targetPort: 443
selector:
karmada-app: karmada-aggregated-apiserver
type: ClusterIP
`
)

View File

@ -0,0 +1,134 @@
package controlplane
import (
"fmt"
appsv1 "k8s.io/api/apps/v1"
kuberuntime "k8s.io/apimachinery/pkg/runtime"
clientset "k8s.io/client-go/kubernetes"
clientsetscheme "k8s.io/client-go/kubernetes/scheme"
operatorv1alpha1 "github.com/karmada-io/karmada/operator/pkg/apis/operator/v1alpha1"
"github.com/karmada-io/karmada/operator/pkg/constants"
"github.com/karmada-io/karmada/operator/pkg/util"
"github.com/karmada-io/karmada/operator/pkg/util/apiclient"
)
// PatchManifest defines a function to patch deployment
type PatchManifest func(cfg *operatorv1alpha1.KarmadaComponents, deployment *appsv1.Deployment)
// EnsureControlPlaneComponent creates karmada controllerManager, kubeControllerManager, scheduler, webhook component
func EnsureControlPlaneComponent(component, name, namespace string, client clientset.Interface, cfg *operatorv1alpha1.KarmadaComponents, patchManifestFunc PatchManifest) error {
deployments, err := getComponentManifests(name, namespace, cfg)
if err != nil {
return err
}
deployment, ok := deployments[component]
if !ok {
return fmt.Errorf("no exist manifest for %s", component)
}
if patchManifestFunc != nil {
patchManifestFunc(cfg, deployment)
}
if err := apiclient.CreateOrUpdateDeployment(client, deployment); err != nil {
return fmt.Errorf("failed to create deployment resource for component %s, err: %w", component, err)
}
return nil
}
func getComponentManifests(name, namespace string, cfg *operatorv1alpha1.KarmadaComponents) (map[string]*appsv1.Deployment, error) {
kubeControllerManager, err := getKubeControllerManagerManifest(name, namespace, cfg.KubeControllerManager)
if err != nil {
return nil, err
}
karmadaControllerManager, err := karmadaControllerManagerManifest(name, namespace, cfg.KarmadaControllerManager)
if err != nil {
return nil, err
}
scheduler, err := karmadaSchedulerManifest(name, namespace, cfg.KarmadaScheduler)
if err != nil {
return nil, err
}
return map[string]*appsv1.Deployment{
constants.KubeControllerManagerComponent: kubeControllerManager,
constants.KarmadaControllerManagerComponent: karmadaControllerManager,
constants.KarmadaSchedulerComponent: scheduler,
}, nil
}
func getKubeControllerManagerManifest(name, namespace string, cfg *operatorv1alpha1.KubeControllerManager) (*appsv1.Deployment, error) {
kubeControllerManageretBytes, err := util.ParseTemplate(KubeControllerManagerDeployment, struct {
DeploymentName, Namespace, Image string
KarmadaCertsSecret, KubeconfigSecret string
Replicas *int32
}{
DeploymentName: util.KubeControllerManagerName(name),
Namespace: namespace,
Image: cfg.Image.Name(),
KarmadaCertsSecret: util.KarmadaCertSecretName(name),
KubeconfigSecret: util.AdminKubeconfigSercretName(name),
Replicas: cfg.Replicas,
})
if err != nil {
return nil, fmt.Errorf("error when parsing KubeControllerManager Deployment template: %w", err)
}
kcm := &appsv1.Deployment{}
if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), kubeControllerManageretBytes, kcm); err != nil {
return nil, fmt.Errorf("err when decoding KubeControllerManager Deployment: %w", err)
}
return kcm, nil
}
func karmadaControllerManagerManifest(name, namespace string, cfg *operatorv1alpha1.KarmadaControllerManager) (*appsv1.Deployment, error) {
karmadaControllerManageretBytes, err := util.ParseTemplate(KamradaControllerManagerDeployment, struct {
Replicas *int32
DeploymentName, Namespace string
Image, KubeconfigSecret string
}{
DeploymentName: util.KarmadaControllerManagerName(name),
Namespace: namespace,
Image: cfg.Image.Name(),
KubeconfigSecret: util.AdminKubeconfigSercretName(name),
Replicas: cfg.Replicas,
})
if err != nil {
return nil, fmt.Errorf("error when parsing KarmadaControllerManager Deployment template: %w", err)
}
kcm := &appsv1.Deployment{}
if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), karmadaControllerManageretBytes, kcm); err != nil {
return nil, fmt.Errorf("err when decoding KarmadaControllerManager Deployment: %w", err)
}
return kcm, nil
}
func karmadaSchedulerManifest(name, namespace string, cfg *operatorv1alpha1.KarmadaScheduler) (*appsv1.Deployment, error) {
karmadaSchedulerBytes, err := util.ParseTemplate(KarmadaSchedulerDeployment, struct {
Replicas *int32
DeploymentName, Namespace string
Image, KubeconfigSecret string
}{
DeploymentName: util.KarmadaSchedulerName(name),
Namespace: namespace,
Image: cfg.Image.Name(),
KubeconfigSecret: util.AdminKubeconfigSercretName(name),
Replicas: cfg.Replicas,
})
if err != nil {
return nil, fmt.Errorf("error when parsing KarmadaScheduler Deployment template: %w", err)
}
scheduler := &appsv1.Deployment{}
if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), karmadaSchedulerBytes, scheduler); err != nil {
return nil, fmt.Errorf("err when decoding KarmadaScheduler Deployment: %w", err)
}
return scheduler, nil
}

View File

@ -0,0 +1,105 @@
package etcd
import (
"fmt"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
kuberuntime "k8s.io/apimachinery/pkg/runtime"
clientset "k8s.io/client-go/kubernetes"
clientsetscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/utils/pointer"
operatorv1alpha1 "github.com/karmada-io/karmada/operator/pkg/apis/operator/v1alpha1"
"github.com/karmada-io/karmada/operator/pkg/constants"
"github.com/karmada-io/karmada/operator/pkg/util"
"github.com/karmada-io/karmada/operator/pkg/util/apiclient"
)
// EnsureKarmadaEtcd creates etcd StatefulSet and service resource.
func EnsureKarmadaEtcd(client clientset.Interface, cfg *operatorv1alpha1.LocalEtcd, name, namespace string) error {
if err := installKarmadaEtcd(client, name, namespace, cfg); err != nil {
return err
}
return createEtcdService(client, name, namespace)
}
func installKarmadaEtcd(client clientset.Interface, name, namespace string, cfg *operatorv1alpha1.LocalEtcd) error {
etcdStatefuleSetBytes, err := util.ParseTemplate(KarmadaEtcdStatefulSet, struct {
StatefulSetName, Namespace, Image string
EtcdClientService, CertsSecretName, EtcdPeerServiceName string
Replicas *int32
EtcdListenClientPort, EtcdListenPeerPort int32
}{
StatefulSetName: util.KarmadaEtcdName(name),
Namespace: namespace,
Image: cfg.Image.Name(),
EtcdClientService: util.KarmadaEtcdClientName(name),
CertsSecretName: util.EtcdCertSecretName(name),
EtcdPeerServiceName: util.KarmadaEtcdName(name),
Replicas: pointer.Int32(1),
EtcdListenClientPort: constants.EtcdListenClientPort,
EtcdListenPeerPort: constants.EtcdListenPeerPort,
})
if err != nil {
return fmt.Errorf("error when parsing Etcd statefuelset template: %w", err)
}
etcdStatefulSet := &appsv1.StatefulSet{}
if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), etcdStatefuleSetBytes, etcdStatefulSet); err != nil {
return fmt.Errorf("error when decoding Etcd StatefulSet: %w", err)
}
if err := apiclient.CreateOrUpdateStatefulSet(client, etcdStatefulSet); err != nil {
return fmt.Errorf("error when creating Etcd statefulset, err: %w", err)
}
return nil
}
func createEtcdService(client clientset.Interface, name, namespace string) error {
etcdServicePeerBytes, err := util.ParseTemplate(KarmadaEtcdPeerService, struct {
ServiceName, Namespace string
EtcdListenClientPort, EtcdListenPeerPort int32
}{
ServiceName: util.KarmadaEtcdName(name),
Namespace: namespace,
EtcdListenClientPort: constants.EtcdListenClientPort,
EtcdListenPeerPort: constants.EtcdListenPeerPort,
})
if err != nil {
return fmt.Errorf("error when parsing Etcd client serive template: %w", err)
}
etcdPeerService := &corev1.Service{}
if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), etcdServicePeerBytes, etcdPeerService); err != nil {
return fmt.Errorf("error when decoding Etcd client service: %w", err)
}
if err := apiclient.CreateOrUpdateService(client, etcdPeerService); err != nil {
return fmt.Errorf("error when creating etcd client service, err: %w", err)
}
etcdClientServiceBytes, err := util.ParseTemplate(KarmadaEtcdClientService, struct {
ServiceName, Namespace string
EtcdListenClientPort int32
}{
ServiceName: util.KarmadaEtcdClientName(name),
Namespace: namespace,
EtcdListenClientPort: constants.EtcdListenClientPort,
})
if err != nil {
return fmt.Errorf("error when parsing Etcd client serive template: %w", err)
}
etcdClientService := &corev1.Service{}
if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), etcdClientServiceBytes, etcdClientService); err != nil {
return fmt.Errorf("err when decoding Etcd client service: %w", err)
}
if err := apiclient.CreateOrUpdateService(client, etcdClientService); err != nil {
return fmt.Errorf("err when creating etcd client service, err: %w", err)
}
return nil
}

View File

@ -0,0 +1,123 @@
package etcd
const (
// KarmadaEtcdStatefulSet is karmada etcd StatefulSet manifest
KarmadaEtcdStatefulSet = `
apiVersion: apps/v1
kind: StatefulSet
metadata:
labels:
karmada-app: etcd
app.kubernetes.io/managed-by: karmada-operator
namespace: {{ .Namespace }}
name: {{ .StatefulSetName }}
spec:
replicas: {{ .Replicas }}
serviceName: {{ .StatefulSetName }}
podManagementPolicy: Parallel
selector:
matchLabels:
karmada-app: etcd
template:
metadata:
labels:
karmada-app: etcd
tolerations:
- operator: Exists
spec:
automountServiceAccountToken: false
containers:
- name: etcd
image: {{ .Image }}
imagePullPolicy: IfNotPresent
command:
- /usr/local/bin/etcd
- --name={{ .StatefulSetName }}0
- --listen-client-urls= https://0.0.0.0:{{ .EtcdListenClientPort }}
- --listen-peer-urls=http://0.0.0.0:{{ .EtcdListenPeerPort }}
- --advertise-client-urls=https://{{ .EtcdClientService }}.{{ .Namespace }}.svc.cluster.local:{{ .EtcdListenClientPort }}
- --initial-cluster={{ .StatefulSetName }}0=http://{{ .StatefulSetName }}-0.{{ .EtcdPeerServiceName }}.{{ .Namespace }}.svc.cluster.local:{{ .EtcdListenPeerPort }}
- --initial-cluster-state=new
- --client-cert-auth=true
- --trusted-ca-file=/etc/karmada/pki/etcd/etcd-ca.crt
- --cert-file=/etc/karmada/pki/etcd/etcd-server.crt
- --key-file=/etc/karmada/pki/etcd/etcd-server.key
- --data-dir=/var/lib/etcd
- --snapshot-count=10000
- --log-level=debug
livenessProbe:
exec:
command:
- /bin/sh
- -ec
- etcdctl get /registry --prefix --keys-only --endpoints https://127.0.0.1:{{ .EtcdListenClientPort }} --cacert=/etc/karmada/pki/etcd/etcd-ca.crt --cert=/etc/karmada/pki/etcd/etcd-server.crt --key=/etc/karmada/pki/etcd/etcd-server.key
failureThreshold: 3
initialDelaySeconds: 600
periodSeconds: 60
successThreshold: 1
timeoutSeconds: 10
ports:
- containerPort: {{ .EtcdListenClientPort }}
name: client
protocol: TCP
- containerPort: {{ .EtcdListenPeerPort }}
name: server
protocol: TCP
volumeMounts:
- mountPath: /var/lib/etcd
name: etcd-data
- mountPath: /etc/karmada/pki/etcd
name: etcd-cert
volumes:
- name: etcd-cert
secret:
secretName: {{ .CertsSecretName }}
- name: etcd-data
emptyDir: {}
`
// KarmadaEtcdClientService is karmada etcd client service manifest
KarmadaEtcdClientService = `
apiVersion: v1
kind: Service
metadata:
labels:
karmada-app: etcd
app.kubernetes.io/managed-by: karmada-operator
name: {{ .ServiceName }}
namespace: {{ .Namespace }}
spec:
ports:
- name: client
port: {{ .EtcdListenClientPort }}
protocol: TCP
targetPort: {{ .EtcdListenClientPort }}
selector:
karmada-app: etcd
type: ClusterIP
`
// KarmadaEtcdPeerService is karmada etcd peer Service manifest
KarmadaEtcdPeerService = `
apiVersion: v1
kind: Service
metadata:
labels:
karmada-app: etcd
app.kubernetes.io/managed-by: karmada-operator
name: {{ .ServiceName }}
namespace: {{ .Namespace }}
spec:
clusterIP: None
ports:
- name: client
port: {{ .EtcdListenClientPort }}
protocol: TCP
targetPort: {{ .EtcdListenClientPort }}
- name: server
port: {{ .EtcdListenPeerPort }}
protocol: TCP
targetPort: {{ .EtcdListenPeerPort }}
type: ClusterIP
`
)

View File

@ -0,0 +1,194 @@
package controlplane
const (
// KubeControllerManagerDeployment is KubeControllerManage deployment manifest
KubeControllerManagerDeployment = `
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .DeploymentName }}
namespace: {{ .Namespace }}
labels:
karmada-app: kube-controller-manager
spec:
replicas: {{ .Replicas }}
selector:
matchLabels:
karmada-app: kube-controller-manager
strategy:
rollingUpdate:
maxSurge: 1
maxUnavailable: 1
type: RollingUpdate
template:
metadata:
labels:
karmada-app: kube-controller-manager
spec:
automountServiceAccountToken: false
priorityClassName: system-node-critical
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: karmada-app
operator: In
values: ["kube-controller-manager"]
topologyKey: kubernetes.io/hostname
containers:
- name: kube-controller-manager
image: {{ .Image }}
imagePullPolicy: IfNotPresent
command:
- kube-controller-manager
- --allocate-node-cidrs=true
- --kubeconfig=/etc/karmada/config
- --authentication-kubeconfig=/etc/karmada/config
- --authorization-kubeconfig=/etc/karmada/config
- --bind-address=0.0.0.0
- --client-ca-file=/etc/karmada/pki/ca.crt
- --cluster-cidr=10.244.0.0/16
- --cluster-name=karmada
- --cluster-signing-cert-file=/etc/karmada/pki/ca.crt
- --cluster-signing-key-file=/etc/karmada/pki/ca.key
- --controllers=namespace,garbagecollector,serviceaccount-token,ttl-after-finished,bootstrapsigner,csrapproving,csrcleaner,csrsigning
- --leader-elect=true
- --node-cidr-mask-size=24
- --root-ca-file=/etc/karmada/pki/ca.crt
- --service-account-private-key-file=/etc/karmada/pki/karmada.key
- --service-cluster-ip-range=10.96.0.0/12
- --use-service-account-credentials=true
- --v=4
livenessProbe:
failureThreshold: 8
httpGet:
path: /healthz
port: 10257
scheme: HTTPS
initialDelaySeconds: 10
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 15
volumeMounts:
- name: karmada-certs
mountPath: /etc/karmada/pki
readOnly: true
- name: kubeconfig
mountPath: /etc/karmada/config
subPath: config
volumes:
- name: karmada-certs
secret:
secretName: {{ .KarmadaCertsSecret }}
- name: kubeconfig
secret:
secretName: {{ .KubeconfigSecret }}
`
// KamradaControllerManagerDeployment is karmada controllerManager Deployment manifest
KamradaControllerManagerDeployment = `
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .DeploymentName }}
namespace: {{ .Namespace }}
labels:
karmada-app: karmada-controller-manager
spec:
replicas: {{ .Replicas }}
selector:
matchLabels:
karmada-app: karmada-controller-manager
template:
metadata:
labels:
karmada-app: karmada-controller-manager
spec:
automountServiceAccountToken: false
tolerations:
- key: node-role.kubernetes.io/master
operator: Exists
containers:
- name: karmada-controller-manager
image: {{ .Image }}
imagePullPolicy: IfNotPresent
command:
- /bin/karmada-controller-manager
- --kubeconfig=/etc/karmada/config
- --bind-address=0.0.0.0
- --cluster-status-update-frequency=10s
- --secure-port=10357
- --failover-eviction-timeout=30s
- --v=4
livenessProbe:
httpGet:
path: /healthz
port: 10357
scheme: HTTP
failureThreshold: 3
initialDelaySeconds: 15
periodSeconds: 15
timeoutSeconds: 5
volumeMounts:
- name: kubeconfig
subPath: config
mountPath: /etc/karmada/config
volumes:
- name: kubeconfig
secret:
secretName: {{ .KubeconfigSecret }}
`
// KarmadaSchedulerDeployment is KarmadaScheduler Deployment manifest
KarmadaSchedulerDeployment = `
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .DeploymentName }}
namespace: {{ .Namespace }}
labels:
karmada-app: karmada-scheduler
spec:
replicas: {{ .Replicas }}
selector:
matchLabels:
karmada-app: karmada-scheduler
template:
metadata:
labels:
karmada-app: karmada-scheduler
spec:
automountServiceAccountToken: false
tolerations:
- key: node-role.kubernetes.io/master
operator: Exists
containers:
- name: karmada-scheduler
image: {{ .Image }}
imagePullPolicy: IfNotPresent
command:
- /bin/karmada-scheduler
- --kubeconfig=/etc/karmada/config
- --bind-address=0.0.0.0
- --secure-port=10351
- --enable-scheduler-estimator=true
- --v=4
livenessProbe:
httpGet:
path: /healthz
port: 10351
scheme: HTTP
failureThreshold: 3
initialDelaySeconds: 15
periodSeconds: 15
timeoutSeconds: 5
volumeMounts:
- name: kubeconfig
subPath: config
mountPath: /etc/karmada/config
volumes:
- name: kubeconfig
secret:
secretName: {{ .KubeconfigSecret }}
`
)

View File

@ -0,0 +1,77 @@
package webhook
const (
// KarmadaWebhookDeployment is karmada webhook deployment manifest
KarmadaWebhookDeployment = `
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .DeploymentName }}
namespace: {{ .Namespace }}
labels:
karmada-app: karmada-webhook
spec:
replicas: {{ .Replicas }}
selector:
matchLabels:
karmada-app: karmada-webhook
template:
metadata:
labels:
karmada-app: karmada-webhook
spec:
automountServiceAccountToken: false
tolerations:
- key: node-role.kubernetes.io/master
operator: Exists
containers:
- name: karmada-webhook
image: {{ .Image }}
imagePullPolicy: IfNotPresent
command:
- /bin/karmada-webhook
- --kubeconfig=/etc/karmada/config
- --bind-address=0.0.0.0
- --default-not-ready-toleration-seconds=30
- --default-unreachable-toleration-seconds=30
- --secure-port=8443
- --cert-dir=/var/serving-cert
- --v=4
ports:
- containerPort: 8443
volumeMounts:
- name: kubeconfig
subPath: config
mountPath: /etc/karmada/config
- name: cert
mountPath: /var/serving-cert
readOnly: true
readinessProbe:
httpGet:
path: /readyz
port: 8443
scheme: HTTPS
volumes:
- name: kubeconfig
secret:
secretName: {{ .KubeconfigSecret }}
- name: cert
secret:
secretName: {{ .WebhookCertsSecret }}
`
// KarmadaWebhookService is karmada webhook service manifest
KarmadaWebhookService = `
apiVersion: v1
kind: Service
metadata:
name: {{ .ServiceName }}
namespace: {{ .Namespace }}
spec:
selector:
karmada-app: karmada-webhook
ports:
- port: 443
targetPort: 8443
`
)

View File

@ -0,0 +1,74 @@
package webhook
import (
"fmt"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
kuberuntime "k8s.io/apimachinery/pkg/runtime"
clientset "k8s.io/client-go/kubernetes"
clientsetscheme "k8s.io/client-go/kubernetes/scheme"
operatorv1alpha1 "github.com/karmada-io/karmada/operator/pkg/apis/operator/v1alpha1"
"github.com/karmada-io/karmada/operator/pkg/util"
"github.com/karmada-io/karmada/operator/pkg/util/apiclient"
)
// EnsureKarmadaWebhook creates karmada webhook deployment and service resource.
func EnsureKarmadaWebhook(client clientset.Interface, cfg *operatorv1alpha1.KarmadaWebhook, name, namespace string) error {
if err := installKarmadaWebhook(client, cfg, name, namespace); err != nil {
return err
}
return createKarmadaWebhookService(client, name, namespace)
}
func installKarmadaWebhook(client clientset.Interface, cfg *operatorv1alpha1.KarmadaWebhook, name, namespace string) error {
webhookDeploymentSetBytes, err := util.ParseTemplate(KarmadaWebhookDeployment, struct {
DeploymentName, Namespace, Image string
KubeconfigSecret, WebhookCertsSecret string
Replicas *int32
}{
DeploymentName: util.KarmadaWebhookName(name),
Namespace: namespace,
Image: cfg.Image.Name(),
Replicas: cfg.Replicas,
KubeconfigSecret: util.AdminKubeconfigSercretName(name),
WebhookCertsSecret: util.WebhookCertSecretName(name),
})
if err != nil {
return fmt.Errorf("error when parsing KarmadaWebhook Deployment template: %w", err)
}
webhookDeployment := &appsv1.Deployment{}
if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), webhookDeploymentSetBytes, webhookDeployment); err != nil {
return fmt.Errorf("err when decoding KarmadaWebhook Deployment: %w", err)
}
if err := apiclient.CreateOrUpdateDeployment(client, webhookDeployment); err != nil {
return fmt.Errorf("error when creating deployment for %s, err: %w", webhookDeployment.Name, err)
}
return nil
}
func createKarmadaWebhookService(client clientset.Interface, name, namespace string) error {
webhookServiceSetBytes, err := util.ParseTemplate(KarmadaWebhookService, struct {
ServiceName, Namespace string
}{
ServiceName: util.KarmadaWebhookName(name),
Namespace: namespace,
})
if err != nil {
return fmt.Errorf("error when parsing KarmadaWebhook Service template: %w", err)
}
webhookService := &corev1.Service{}
if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), webhookServiceSetBytes, webhookService); err != nil {
return fmt.Errorf("err when decoding KarmadaWebhook Service: %w", err)
}
if err := apiclient.CreateOrUpdateService(client, webhookService); err != nil {
return fmt.Errorf("err when creating service for %s, err: %w", webhookService.Name, err)
}
return nil
}

283
operator/pkg/init.go Normal file
View File

@ -0,0 +1,283 @@
package karmada
import (
"errors"
"fmt"
"net/url"
"sync"
corev1 "k8s.io/api/core/v1"
utilversion "k8s.io/apimachinery/pkg/util/version"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"
operatorv1alpha1 "github.com/karmada-io/karmada/operator/pkg/apis/operator/v1alpha1"
"github.com/karmada-io/karmada/operator/pkg/certs"
"github.com/karmada-io/karmada/operator/pkg/constants"
tasks "github.com/karmada-io/karmada/operator/pkg/tasks/init"
workflow "github.com/karmada-io/karmada/operator/pkg/workflow"
)
var (
defaultCrdURL = "https://github.com/karmada-io/karmada/releases/download/%s/crds.tar.gz"
)
// InitOptions defines all the init options.
type InitOptions struct {
Name string
Namespace string
Kubeconfig *rest.Config
KarmadaVersion string
CrdRemoteURL string
KarmadaDataDir string
PrivateRegistry string
HostCluster *operatorv1alpha1.HostCluster
Components *operatorv1alpha1.KarmadaComponents
FeatureGates map[string]bool
}
// InitOpt defines a type of function to set InitOptions values.
type InitOpt func(opt *InitOptions)
var _ tasks.InitData = &initData{}
// initData defines all the runtime information used when ruing init workflow;
// this data is shared across all the tasks tha are included in the workflow.
type initData struct {
sync.Once
certs.CertStore
name string
namespace string
karmadaVersion *utilversion.Version
controlplaneConifig *rest.Config
remoteClient clientset.Interface
karmadaClient clientset.Interface
dnsDomain string
crdRemoteURL string
karmadaDataDir string
privateRegistry string
featureGates map[string]bool
components *operatorv1alpha1.KarmadaComponents
}
// NewInitJob initializes a job with list of init sub task. and build
// init runData object.
func NewInitJob(opt *InitOptions) *workflow.Job {
initJob := workflow.NewJob()
// add the all of tasks to the init job workflow.
initJob.AppendTask(tasks.NewPrepareCrdsTask())
initJob.AppendTask(tasks.NewCertTask())
initJob.AppendTask(tasks.NewNamespaceTask())
initJob.AppendTask(tasks.NewUploadKubeconfigTask())
initJob.AppendTask(tasks.NewUploadCertsTask())
initJob.AppendTask(tasks.NewEtcdTask())
initJob.AppendTask(tasks.NewKarmadaApiserverTask())
initJob.AppendTask(tasks.NewWaitApiserverTask())
initJob.AppendTask(tasks.NewKarmadaResourcesTask())
initJob.AppendTask(tasks.NewComponentTask())
initJob.AppendTask(tasks.NewWaitControlPlaneTask())
initJob.SetDataInitializer(func() (workflow.RunData, error) {
// if there is no endpoint info, we are consider that the local cluster
// is remote cluster to install karmada.
var remoteClient clientset.Interface
if opt.HostCluster.SecretRef == nil && len(opt.HostCluster.APIEndpoint) == 0 {
client, err := clientset.NewForConfig(opt.Kubeconfig)
if err != nil {
return nil, fmt.Errorf("error when create cluster client to install karmada, err: %w", err)
}
remoteClient = client
}
if len(opt.Name) == 0 || len(opt.Namespace) == 0 {
return nil, errors.New("unexpected empty name or namespace")
}
version, err := utilversion.ParseGeneric(opt.KarmadaVersion)
if err != nil {
return nil, fmt.Errorf("unexpected karmada invalid version %s", opt.KarmadaVersion)
}
if len(opt.CrdRemoteURL) > 0 {
if _, err := url.Parse(opt.CrdRemoteURL); err != nil {
return nil, fmt.Errorf("unexpected invalid crds remote url %s", opt.CrdRemoteURL)
}
}
// TODO: Verify whether important values of initData is valid
return &initData{
name: opt.Name,
namespace: opt.Namespace,
karmadaVersion: version,
remoteClient: remoteClient,
crdRemoteURL: opt.CrdRemoteURL,
karmadaDataDir: opt.KarmadaDataDir,
privateRegistry: opt.PrivateRegistry,
components: opt.Components,
featureGates: opt.FeatureGates,
dnsDomain: *opt.HostCluster.Networking.DNSDomain,
CertStore: certs.NewCertStore(),
}, nil
})
return initJob
}
func (data *initData) GetName() string {
return data.name
}
func (data *initData) GetNamespace() string {
return data.namespace
}
func (data *initData) RemoteClient() clientset.Interface {
return data.remoteClient
}
func (data *initData) KarmadaClient() clientset.Interface {
if data.karmadaClient == nil {
data.Once.Do(func() {
client, err := clientset.NewForConfig(data.controlplaneConifig)
if err != nil {
klog.Errorf("error when init karmada client, err: %w", err)
}
data.karmadaClient = client
})
}
return data.karmadaClient
}
func (data *initData) ControlplaneConifg() *rest.Config {
return data.controlplaneConifig
}
func (data *initData) SetControlplaneConifg(config *rest.Config) {
data.controlplaneConifig = config
}
func (data *initData) Components() *operatorv1alpha1.KarmadaComponents {
return data.components
}
func (data *initData) DataDir() string {
return data.karmadaDataDir
}
func (data *initData) CrdsRomoteURL() string {
return data.crdRemoteURL
}
func (data *initData) KarmadaVersion() string {
return data.karmadaVersion.String()
}
// NewJobOptions calls all of InitOpt func to initialize a InitOptions.
// if there is not InitOpt functions, it will return a default InitOptions.
func NewJobOptions(opts ...InitOpt) *InitOptions {
options := defaultJobOptions()
for _, c := range opts {
c(options)
}
return options
}
func defaultJobOptions() *InitOptions {
return &InitOptions{
CrdRemoteURL: fmt.Sprintf(defaultCrdURL, constants.KarmadaDefaultVersion),
Name: "karmada",
Namespace: constants.KarmadaSystemNamespace,
KarmadaVersion: constants.KarmadaDefaultVersion,
KarmadaDataDir: constants.KarmadaDataDir,
Components: defaultComponents(),
HostCluster: defaultHostCluster(),
}
}
func defaultHostCluster() *operatorv1alpha1.HostCluster {
return &operatorv1alpha1.HostCluster{
Networking: &operatorv1alpha1.Networking{
DNSDomain: pointer.String("cluster.local"),
},
}
}
func defaultComponents() *operatorv1alpha1.KarmadaComponents {
return &operatorv1alpha1.KarmadaComponents{
Etcd: &operatorv1alpha1.Etcd{
Local: &operatorv1alpha1.LocalEtcd{
Image: operatorv1alpha1.Image{
ImageRepository: fmt.Sprintf("%s/%s", constants.KubeDefaultRepository, constants.Etcd),
ImageTag: constants.EtcdDefaultVersion,
},
VolumeData: &operatorv1alpha1.VolumeData{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
},
},
KarmadaAPIServer: &operatorv1alpha1.KarmadaAPIServer{
CommonSettings: operatorv1alpha1.CommonSettings{
Replicas: pointer.Int32(1),
Image: operatorv1alpha1.Image{
ImageRepository: fmt.Sprintf("%s/%s", constants.KubeDefaultRepository, constants.KarmadaAPIServer),
ImageTag: constants.KubeDefaultVersion,
},
},
ServiceSubnet: pointer.String("10.96.0.0/12"),
ServiceType: corev1.ServiceTypeClusterIP,
},
KarmadaAggregratedAPIServer: &operatorv1alpha1.KarmadaAggregratedAPIServer{
CommonSettings: operatorv1alpha1.CommonSettings{
Replicas: pointer.Int32(1),
Image: operatorv1alpha1.Image{
ImageRepository: fmt.Sprintf("%s/%s", constants.KarmadaDefaultRepository, constants.KarmadaAggregatedAPIServer),
ImageTag: constants.KarmadaDefaultVersion,
},
},
},
KubeControllerManager: &operatorv1alpha1.KubeControllerManager{
CommonSettings: operatorv1alpha1.CommonSettings{
Replicas: pointer.Int32(1),
Image: operatorv1alpha1.Image{
ImageRepository: fmt.Sprintf("%s/%s", constants.KubeDefaultRepository, constants.KubeControllerManager),
ImageTag: constants.KubeDefaultVersion,
},
},
},
KarmadaControllerManager: &operatorv1alpha1.KarmadaControllerManager{
CommonSettings: operatorv1alpha1.CommonSettings{
Replicas: pointer.Int32(1),
Image: operatorv1alpha1.Image{
ImageRepository: fmt.Sprintf("%s/%s", constants.KarmadaDefaultRepository, constants.KarmadaControllerManager),
ImageTag: constants.KarmadaDefaultVersion,
},
},
},
KarmadaScheduler: &operatorv1alpha1.KarmadaScheduler{
CommonSettings: operatorv1alpha1.CommonSettings{
Replicas: pointer.Int32(1),
Image: operatorv1alpha1.Image{
ImageRepository: fmt.Sprintf("%s/%s", constants.KarmadaDefaultRepository, constants.KarmadaScheduler),
ImageTag: constants.KarmadaDefaultVersion,
},
},
},
KarmadaWebhook: &operatorv1alpha1.KarmadaWebhook{
CommonSettings: operatorv1alpha1.CommonSettings{
Replicas: pointer.Int32(1),
Image: operatorv1alpha1.Image{
ImageRepository: fmt.Sprintf("%s/%s", constants.KarmadaDefaultRepository, constants.KarmadaWebhook),
ImageTag: constants.KarmadaDefaultVersion,
},
},
},
}
}

View File

@ -0,0 +1,85 @@
package apiservice
import (
"fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
kuberuntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientset "k8s.io/client-go/kubernetes"
clientsetscheme "k8s.io/client-go/kubernetes/scheme"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
aggregator "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
"github.com/karmada-io/karmada/operator/pkg/util"
"github.com/karmada-io/karmada/operator/pkg/util/apiclient"
)
var (
scheme = runtime.NewScheme()
codecs = serializer.NewCodecFactory(scheme)
)
func init() {
metav1.AddToGroupVersion(scheme, schema.GroupVersion{Version: "v1"})
utilruntime.Must(apiregistrationv1.AddToScheme(scheme))
}
// EnsureAggregatedAPIService creates aggregated APIService and a service
func EnsureAggregatedAPIService(aggregatorClient *aggregator.Clientset, client clientset.Interface, name, namespace string) error {
if err := aggregatedApiserverService(client, name, namespace); err != nil {
return err
}
return aggregatedAPIService(aggregatorClient, name, namespace)
}
func aggregatedAPIService(client *aggregator.Clientset, name, namespace string) error {
apiServiceBytes, err := util.ParseTemplate(KarmadaAggregatedAPIService, struct {
Namespace string
ServiceName string
}{
Namespace: namespace,
ServiceName: util.KarmadaAggratedAPIServerName(name),
})
if err != nil {
return fmt.Errorf("error when parsing AggregatedApiserver APIService template: %w", err)
}
apiService := &apiregistrationv1.APIService{}
if err := kuberuntime.DecodeInto(codecs.UniversalDecoder(), apiServiceBytes, apiService); err != nil {
return fmt.Errorf("err when decoding AggregatedApiserver APIService: %w", err)
}
if err := apiclient.CreateOrUpdateAPIService(client, apiService); err != nil {
return err
}
return nil
}
func aggregatedApiserverService(client clientset.Interface, name, namespace string) error {
aggregatedApiserverServiceBytes, err := util.ParseTemplate(KarmadaAggregatedApiserverService, struct {
Namespace string
ServiceName string
}{
Namespace: namespace,
ServiceName: util.KarmadaAggratedAPIServerName(name),
})
if err != nil {
return fmt.Errorf("error when parsing AggregatedApiserver Service template: %w", err)
}
aggregatedService := &corev1.Service{}
if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), aggregatedApiserverServiceBytes, aggregatedService); err != nil {
return fmt.Errorf("err when decoding AggregatedApiserver Service: %w", err)
}
if err := apiclient.CreateOrUpdateService(client, aggregatedService); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,34 @@
package apiservice
const (
// KarmadaAggregatedAPIService is karmada aggregated apiserver APIService manifest
KarmadaAggregatedAPIService = `
apiVersion: apiregistration.k8s.io/v1
kind: APIService
metadata:
labels:
apiserver: "true"
app: karmada-aggregated-apiserver
name: v1alpha1.cluster.karmada.io
spec:
group: cluster.karmada.io
groupPriorityMinimum: 2000
insecureSkipTLSVerify: true
service:
name: {{ .ServiceName }}
namespace: {{ .Namespace }}
version: v1alpha1
versionPriority: 10
`
// KarmadaAggregatedApiserverService is karmada aggregated apiserver service manifest
KarmadaAggregatedApiserverService = `
apiVersion: v1
kind: Service
metadata:
name: {{ .ServiceName }}
namespace: {{ .Namespace }}
spec:
type: ExternalName
externalName: {{ .ServiceName }}.{{ .Namespace }}.svc
`
)

View File

@ -0,0 +1,165 @@
package webhookconfiguration
const (
// KarmadaWebhookMutatingWebhookConfiguration is karmada webhook mutatingWebhookConfiguration manifest
KarmadaWebhookMutatingWebhookConfiguration = `
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
name: mutating-config
labels:
app: mutating-config
webhooks:
- name: propagationpolicy.karmada.io
rules:
- operations: ["CREATE", "UPDATE"]
apiGroups: ["policy.karmada.io"]
apiVersions: ["*"]
resources: ["propagationpolicies"]
scope: "Namespaced"
clientConfig:
url: https://{{ .Service }}.{{ .Namespace }}.svc:443/mutate-propagationpolicy
caBundle: {{ .CaBundle }}
failurePolicy: Fail
sideEffects: None
admissionReviewVersions: ["v1"]
timeoutSeconds: 3
- name: clusterpropagationpolicy.karmada.io
rules:
- operations: ["CREATE", "UPDATE"]
apiGroups: ["policy.karmada.io"]
apiVersions: ["*"]
resources: ["clusterpropagationpolicies"]
scope: "Cluster"
clientConfig:
url: https://{{ .Service }}.{{ .Namespace }}.svc:443/mutate-clusterpropagationpolicy
caBundle: {{ .CaBundle }}
failurePolicy: Fail
sideEffects: None
admissionReviewVersions: ["v1"]
timeoutSeconds: 3
- name: overridepolicy.karmada.io
rules:
- operations: ["CREATE", "UPDATE"]
apiGroups: ["policy.karmada.io"]
apiVersions: ["*"]
resources: ["overridepolicies"]
scope: "Namespaced"
clientConfig:
url: https://{{ .Service }}.{{ .Namespace }}.svc:443/mutate-overridepolicy
caBundle: {{ .CaBundle }}
failurePolicy: Fail
sideEffects: None
admissionReviewVersions: ["v1"]
timeoutSeconds: 3
- name: work.karmada.io
rules:
- operations: ["CREATE", "UPDATE"]
apiGroups: ["work.karmada.io"]
apiVersions: ["*"]
resources: ["works"]
scope: "Namespaced"
clientConfig:
url: https://{{ .Service }}.{{ .Namespace }}.svc:443/mutate-work
caBundle: {{ .CaBundle }}
failurePolicy: Fail
sideEffects: None
admissionReviewVersions: ["v1"]
timeoutSeconds: 3
`
// KarmadaWebhookValidatingWebhookConfiguration is KarmadaWebhook ValidatingWebhookConfiguration manifest
KarmadaWebhookValidatingWebhookConfiguration = `
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:
name: validating-config
labels:
app: validating-config
webhooks:
- name: propagationpolicy.karmada.io
rules:
- operations: ["CREATE", "UPDATE"]
apiGroups: ["policy.karmada.io"]
apiVersions: ["*"]
resources: ["propagationpolicies"]
scope: "Namespaced"
clientConfig:
url: https://{{ .Service }}.{{ .Namespace }}.svc:443/validate-propagationpolicy
caBundle: {{ .CaBundle }}
failurePolicy: Fail
sideEffects: None
admissionReviewVersions: ["v1"]
timeoutSeconds: 3
- name: clusterpropagationpolicy.karmada.io
rules:
- operations: ["CREATE", "UPDATE"]
apiGroups: ["policy.karmada.io"]
apiVersions: ["*"]
resources: ["clusterpropagationpolicies"]
scope: "Cluster"
clientConfig:
url: https://{{ .Service }}.{{ .Namespace }}.svc:443/validate-clusterpropagationpolicy
caBundle: {{ .CaBundle }}
failurePolicy: Fail
sideEffects: None
admissionReviewVersions: ["v1"]
timeoutSeconds: 3
- name: overridepolicy.karmada.io
rules:
- operations: ["CREATE", "UPDATE"]
apiGroups: ["policy.karmada.io"]
apiVersions: ["*"]
resources: ["overridepolicies"]
scope: "Namespaced"
clientConfig:
url: https://{{ .Service }}.{{ .Namespace }}.svc:443/validate-overridepolicy
caBundle: {{ .CaBundle }}
failurePolicy: Fail
sideEffects: None
admissionReviewVersions: ["v1"]
timeoutSeconds: 3
- name: clusteroverridepolicy.karmada.io
rules:
- operations: ["CREATE", "UPDATE"]
apiGroups: ["policy.karmada.io"]
apiVersions: ["*"]
resources: ["clusteroverridepolicies"]
scope: "Cluster"
clientConfig:
url: https://{{ .Service }}.{{ .Namespace }}.svc:443/validate-clusteroverridepolicy
caBundle: {{ .CaBundle }}
failurePolicy: Fail
sideEffects: None
admissionReviewVersions: ["v1"]
timeoutSeconds: 3
- name: config.karmada.io
rules:
- operations: ["CREATE", "UPDATE"]
apiGroups: ["config.karmada.io"]
apiVersions: ["*"]
resources: ["resourceinterpreterwebhookconfigurations"]
scope: "Cluster"
clientConfig:
url: https://{{ .Service }}.{{ .Namespace }}.svc:443/validate-resourceinterpreterwebhookconfiguration
caBundle: {{ .CaBundle }}
failurePolicy: Fail
sideEffects: None
admissionReviewVersions: ["v1"]
timeoutSeconds: 3
- name: resourceinterpretercustomization.karmada.io
rules:
- operations: ["CREATE", "UPDATE"]
apiGroups: ["config.karmada.io"]
apiVersions: ["*"]
resources: ["resourceinterpretercustomizations"]
scope: "Cluster"
clientConfig:
url: https://{{ .Service }}.{{ .Namespace }}.svc:443/validate-resourceinterpretercustomization
caBundle: {{ .CaBundle }}
failurePolicy: Fail
sideEffects: None
admissionReviewVersions: ["v1"]
timeoutSeconds: 3
`
)

View File

@ -0,0 +1,65 @@
package webhookconfiguration
import (
"fmt"
admissionregistrationv1 "k8s.io/api/admissionregistration/v1"
kuberuntime "k8s.io/apimachinery/pkg/runtime"
clientset "k8s.io/client-go/kubernetes"
clientsetscheme "k8s.io/client-go/kubernetes/scheme"
"github.com/karmada-io/karmada/operator/pkg/util"
"github.com/karmada-io/karmada/operator/pkg/util/apiclient"
)
// EnsureWebhookconfiguration creates karmada webhook mutatingWebhookConfiguration and validatingWebhookConfiguration
func EnsureWebhookconfiguration(client clientset.Interface, namespace, name, caBundle string) error {
if err := mutatingWebhookConfiguration(client, namespace, name, caBundle); err != nil {
return err
}
return validatingWebhookConfiguration(client, namespace, name, caBundle)
}
func mutatingWebhookConfiguration(client clientset.Interface, namespace, name, caBundle string) error {
configurationBytes, err := util.ParseTemplate(KarmadaWebhookMutatingWebhookConfiguration, struct {
Service string
Namespace string
CaBundle string
}{
Service: util.KarmadaWebhookName(name),
Namespace: namespace,
CaBundle: caBundle,
})
if err != nil {
return fmt.Errorf("error when parsing Webhook MutatingWebhookConfiguration template: %w", err)
}
mwc := &admissionregistrationv1.MutatingWebhookConfiguration{}
if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), configurationBytes, mwc); err != nil {
return fmt.Errorf("err when decoding Webhook MutatingWebhookConfiguration: %w", err)
}
return apiclient.CreateOrUpdateMutatingWebhookConfiguration(client, mwc)
}
func validatingWebhookConfiguration(client clientset.Interface, namespace, name, caBundle string) error {
configurationBytes, err := util.ParseTemplate(KarmadaWebhookValidatingWebhookConfiguration, struct {
Service string
Namespace string
CaBundle string
}{
Service: util.KarmadaWebhookName(name),
Namespace: namespace,
CaBundle: caBundle,
})
if err != nil {
return fmt.Errorf("error when parsing Webhook ValidatingWebhookConfiguration template: %w", err)
}
vwc := &admissionregistrationv1.ValidatingWebhookConfiguration{}
if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), configurationBytes, vwc); err != nil {
return fmt.Errorf("err when decoding Webhook ValidatingWebhookConfiguration: %w", err)
}
return apiclient.CreateOrUpdateValidatingWebhookConfiguration(client, vwc)
}

View File

@ -0,0 +1,84 @@
package tasks
import (
"errors"
"fmt"
"k8s.io/klog/v2"
"github.com/karmada-io/karmada/operator/pkg/constants"
"github.com/karmada-io/karmada/operator/pkg/controlplane/apiserver"
"github.com/karmada-io/karmada/operator/pkg/workflow"
)
// NewKarmadaApiserverTask init apiserver task to install karmada apiserver and
// karmada aggregated apiserver component
func NewKarmadaApiserverTask() workflow.Task {
return workflow.Task{
Name: "apiserver",
Run: runApiserver,
RunSubTasks: true,
Tasks: []workflow.Task{
{
Name: constants.KarmadaAPIserverComponent,
Run: runKarmadaAPIServer,
},
{
Name: constants.KarmadaAggregratedAPIServerComponent,
Run: runKarmadaAggregratedAPIServer,
},
},
}
}
func runApiserver(r workflow.RunData) error {
data, ok := r.(InitData)
if !ok {
return errors.New("apiserver task invoked with an invalid data struct")
}
klog.V(4).InfoS("[apiserver] Running apiserver task", "karmada", klog.KObj(data))
return nil
}
func runKarmadaAPIServer(r workflow.RunData) error {
data, ok := r.(InitData)
if !ok {
return errors.New("karmadaApiserver task invoked with an invalid data struct")
}
cfg := data.Components()
if cfg.KarmadaAPIServer == nil {
klog.V(2).InfoS("[karmadaApiserver] Skip install karmada-apiserver component")
return nil
}
err := apiserver.EnsureKarmadaAPIServer(data.RemoteClient(), cfg, data.GetName(), data.GetNamespace())
if err != nil {
return fmt.Errorf("failed to install karmada apiserver component, err: %w", err)
}
klog.V(2).InfoS("[karmadaApiserver] Successfully installed apiserver component", "karmada", klog.KObj(data))
return nil
}
func runKarmadaAggregratedAPIServer(r workflow.RunData) error {
data, ok := r.(InitData)
if !ok {
return errors.New("karmadaAggregratedApiServer task invoked with an invalid data struct")
}
cfg := data.Components()
if cfg.KarmadaAggregratedAPIServer == nil {
klog.V(2).InfoS("[KarmadaAggregratedApiServer] Skip install karmada-aggregrated-apiserver component")
return nil
}
err := apiserver.EnsureKarmadaAggregratedAPIServer(data.RemoteClient(), cfg, data.GetName(), data.GetNamespace())
if err != nil {
return fmt.Errorf("failed to install karmada aggregrated apiserver, err: %w", err)
}
klog.V(2).InfoS("[KarmadaAggregratedApiServer] Successfully installed karmada apiserve component", "karmada", klog.KObj(data))
return nil
}

View File

@ -0,0 +1,148 @@
package tasks
import (
"context"
"errors"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
"github.com/karmada-io/karmada/operator/pkg/certs"
"github.com/karmada-io/karmada/operator/pkg/util"
"github.com/karmada-io/karmada/operator/pkg/workflow"
)
// NewCertTask init a Certs task to generate all of karmada certs
func NewCertTask() workflow.Task {
return workflow.Task{
Name: "Certs",
Run: runCerts,
Skip: skipCerts,
RunSubTasks: true,
Tasks: newCertSubTasks(),
}
}
func runCerts(r workflow.RunData) error {
data, ok := r.(InitData)
if !ok {
return errors.New("certs task invoked with an invalid data struct")
}
klog.V(4).InfoS("[certs] Running certs task", "karmada", klog.KObj(data))
return nil
}
func skipCerts(d workflow.RunData) (bool, error) {
data, ok := d.(InitData)
if !ok {
return false, errors.New("certs task invoked with an invalid data struct")
}
secretName := util.KarmadaCertSecretName(data.GetName())
secret, err := data.RemoteClient().CoreV1().Secrets(data.GetNamespace()).Get(context.TODO(), secretName, metav1.GetOptions{})
if err != nil {
return false, nil
}
if err := data.LoadCertFormSercret(secret); err != nil {
return false, err
}
klog.V(4).InfoS("[certs] Successfully loaded certs form secret", "secret", secret.Name, "karmada", klog.KObj(data))
klog.V(2).InfoS("[certs] Skip certs task, found previous certificates in secret", "karmada", klog.KObj(data))
return true, nil
}
func newCertSubTasks() []workflow.Task {
subTasks := []workflow.Task{}
caCert := map[string]*certs.CertConfig{}
for _, cert := range certs.GetDefaultCertList() {
var task workflow.Task
if cert.CAName == "" {
task = workflow.Task{Name: cert.Name, Run: runCATask(cert)}
caCert[cert.Name] = cert
} else {
task = workflow.Task{Name: cert.Name, Run: runCertTask(cert, caCert[cert.CAName])}
}
subTasks = append(subTasks, task)
}
return subTasks
}
func runCATask(kc *certs.CertConfig) func(d workflow.RunData) error {
return func(r workflow.RunData) error {
data, ok := r.(InitData)
if !ok {
return errors.New("certs task invoked with an invalid data struct")
}
if kc.CAName != "" {
return fmt.Errorf("this function should only be used for CAs, but cert %s has CA %s", kc.Name, kc.CAName)
}
klog.V(4).InfoS("[certs] Creating a new certificate authority", "certName", kc.Name)
cert, err := certs.NewCertificateAuthority(kc)
if err != nil {
return err
}
klog.V(2).InfoS("[certs] Successfully generated ca certificate", "certName", kc.Name)
data.AddCert(cert)
return nil
}
}
func runCertTask(cc, caCert *certs.CertConfig) func(d workflow.RunData) error {
return func(r workflow.RunData) error {
data, ok := r.(InitData)
if !ok {
return fmt.Errorf("certs task invoked with an invalid data struct")
}
if caCert == nil {
return fmt.Errorf("unexpected empty ca cert for %s", cc.Name)
}
if cc.CAName != caCert.Name {
return fmt.Errorf("expected CAname for %s, but was %s", cc.CAName, cc.Name)
}
if err := mutateCertConfig(data, cc); err != nil {
return fmt.Errorf("error when mutate cert altNames for %s, err: %w", cc.Name, err)
}
caCert := data.GetCert(cc.CAName)
cert, err := certs.CreateCertAndKeyFilesWithCA(cc, caCert.CertData(), caCert.KeyData())
if err != nil {
return err
}
data.AddCert(cert)
klog.V(2).InfoS("[certs] Successfully generated certificate", "certName", cc.Name, "caName", cc.CAName)
return nil
}
}
func mutateCertConfig(data InitData, cc *certs.CertConfig) error {
if cc.AltNamesMutatorFunc != nil {
err := cc.AltNamesMutatorFunc(&certs.AltNamesMutatorConfig{
Name: data.GetName(),
Namespace: data.GetNamespace(),
Components: data.Components(),
}, cc)
if err != nil {
return err
}
}
return nil
}

View File

@ -0,0 +1,92 @@
package tasks
import (
"errors"
"fmt"
"k8s.io/klog/v2"
"github.com/karmada-io/karmada/operator/pkg/constants"
"github.com/karmada-io/karmada/operator/pkg/controlplane"
"github.com/karmada-io/karmada/operator/pkg/controlplane/webhook"
"github.com/karmada-io/karmada/operator/pkg/workflow"
)
// NewComponentTask init a components task
func NewComponentTask() workflow.Task {
return workflow.Task{
Name: "components",
Run: runComponents,
RunSubTasks: true,
Tasks: []workflow.Task{
newComponentSubTask(constants.KubeControllerManagerComponent, nil),
newComponentSubTask(constants.KarmadaControllerManagerComponent, nil),
newComponentSubTask(constants.KarmadaSchedulerComponent, nil),
{
Name: "KarmadaWebhook",
Run: runKarmadaWebhook,
},
},
}
}
func runComponents(r workflow.RunData) error {
data, ok := r.(InitData)
if !ok {
return fmt.Errorf("components task invoked with an invalid data struct")
}
klog.V(4).InfoS("[components] Running components task", "karmada", klog.KObj(data))
return nil
}
func newComponentSubTask(component string, patchManifestFunc controlplane.PatchManifest) workflow.Task {
return workflow.Task{
Name: component,
Run: runComponentSubTask(component, patchManifestFunc),
}
}
func runComponentSubTask(component string, patchManifestFunc controlplane.PatchManifest) func(r workflow.RunData) error {
return func(r workflow.RunData) error {
data, ok := r.(InitData)
if !ok {
return fmt.Errorf("certs task invoked with an invalid data struct")
}
err := controlplane.EnsureControlPlaneComponent(
component,
data.GetName(),
data.GetNamespace(),
data.RemoteClient(),
data.Components(),
patchManifestFunc,
)
if err != nil {
return fmt.Errorf("failed to apply component %s, err: %w", component, err)
}
klog.V(2).InfoS("[components] Successfully applied component", "component", component, "karmada", klog.KObj(data))
return nil
}
}
func runKarmadaWebhook(r workflow.RunData) error {
data, ok := r.(InitData)
if !ok {
return errors.New("certs task invoked with an invalid data struct")
}
cfg := data.Components()
if cfg.KarmadaWebhook == nil {
return errors.New("skip install karmada webhook")
}
err := webhook.EnsureKarmadaWebhook(data.RemoteClient(), cfg.KarmadaWebhook, data.GetName(), data.GetNamespace())
if err != nil {
return fmt.Errorf("failed to apply karmada webhook, err: %w", err)
}
klog.V(2).InfoS("[KarmadaWebhook] Successfully applied karmada webhook component", "karmada", klog.KObj(data))
return nil
}

View File

@ -0,0 +1,142 @@
package tasks
import (
"errors"
"fmt"
"os"
"path"
"strings"
"k8s.io/klog/v2"
"github.com/karmada-io/karmada/operator/pkg/util"
"github.com/karmada-io/karmada/operator/pkg/workflow"
)
var (
crdsFileSuffix = "crds.tar.gz"
crdPathSuffix = "crds"
)
// NewPrepareCrdsTask init a prepare-crds task
func NewPrepareCrdsTask() workflow.Task {
return workflow.Task{
Name: "prepare-crds",
Run: runPrepareCrds,
RunSubTasks: true,
Tasks: []workflow.Task{
{
Name: "download-crds",
Skip: skipCrdsDownload,
Run: runCrdsDownload,
},
{
Name: "Unpack",
Run: runUnpack,
},
},
}
}
func runPrepareCrds(r workflow.RunData) error {
data, ok := r.(InitData)
if !ok {
return errors.New("prepare-crds task invoked with an invalid data struct")
}
crdsDir := path.Join(data.DataDir(), data.KarmadaVersion())
klog.V(4).InfoS("[prepare-crds] Running prepare-crds task", "karmada", klog.KObj(data))
klog.V(2).InfoS("[prepare-crds] Using crd folder", "folder", crdsDir, "karmada", klog.KObj(data))
return nil
}
func skipCrdsDownload(r workflow.RunData) (bool, error) {
data, ok := r.(InitData)
if !ok {
return false, errors.New("prepare-crds task invoked with an invalid data struct")
}
crdsDir := path.Join(data.DataDir(), data.KarmadaVersion())
if exist, err := util.PathExists(crdsDir); !exist || err != nil {
return false, err
}
if !existCrdsTar(crdsDir) {
return false, nil
}
klog.V(2).InfoS("[download-crds] Skip download crd yaml files, the crd tar exists on disk", "karmada", klog.KObj(data))
return true, nil
}
func runCrdsDownload(r workflow.RunData) error {
data, ok := r.(InitData)
if !ok {
return errors.New("download-crds task invoked with an invalid data struct")
}
var (
crdsDir = path.Join(data.DataDir(), data.KarmadaVersion())
crdsTarPath = path.Join(crdsDir, crdsFileSuffix)
)
exist, err := util.PathExists(crdsDir)
if err != nil {
return err
}
if !exist {
if err := os.MkdirAll(crdsDir, 0755); err != nil {
return err
}
}
if !existCrdsTar(crdsDir) {
err := util.DownloadFile(data.CrdsRomoteURL(), crdsTarPath)
if err != nil {
return fmt.Errorf("failed to download crd tar, err: %w", err)
}
} else {
klog.V(2).InfoS("[download-crds] The crd tar exists on disk", "path", crdsDir, "karmada", klog.KObj(data))
}
klog.V(2).InfoS("[download-crds] Successfully downloaded crd package for remote url", "karmada", klog.KObj(data))
return nil
}
func runUnpack(r workflow.RunData) error {
data, ok := r.(InitData)
if !ok {
return errors.New("unpack task invoked with an invalid data struct")
}
var (
crdsDir = path.Join(data.DataDir(), data.KarmadaVersion())
crdsTarPath = path.Join(crdsDir, crdsFileSuffix)
crdsPath = path.Join(crdsDir, crdPathSuffix)
)
// TODO: check whether crd yaml is valid.
exist, _ := util.PathExists(crdsPath)
if !exist {
if err := util.Unpack(crdsTarPath, crdsDir); err != nil {
return fmt.Errorf("[unpack] failed to unpack crd tar, err: %w", err)
}
} else {
klog.V(2).InfoS("[unpack] These crds yaml files have been decompressed in the path", "path", crdsPath, "karmada", klog.KObj(data))
}
klog.V(2).InfoS("[unpack] Successfully unpacked crd tar", "karmada", klog.KObj(data))
return nil
}
func existCrdsTar(crdsDir string) bool {
files := util.ListFiles(crdsDir)
for _, file := range files {
if strings.Contains(file.Name(), crdsFileSuffix) && file.Size() > 0 {
return true
}
}
return false
}

View File

@ -0,0 +1,24 @@
package tasks
import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
operatorv1alpha1 "github.com/karmada-io/karmada/operator/pkg/apis/operator/v1alpha1"
"github.com/karmada-io/karmada/operator/pkg/certs"
)
// InitData is interface to operate the runData in workflow
type InitData interface {
certs.CertStore
GetName() string
GetNamespace() string
SetControlplaneConifg(config *rest.Config)
ControlplaneConifg() *rest.Config
RemoteClient() clientset.Interface
KarmadaClient() clientset.Interface
DataDir() string
CrdsRomoteURL() string
KarmadaVersion() string
Components() *operatorv1alpha1.KarmadaComponents
}

View File

@ -0,0 +1,45 @@
package tasks
import (
"errors"
"fmt"
"k8s.io/klog/v2"
"github.com/karmada-io/karmada/operator/pkg/controlplane/etcd"
"github.com/karmada-io/karmada/operator/pkg/workflow"
)
// NewEtcdTask init a etcd task to install etcd component
func NewEtcdTask() workflow.Task {
return workflow.Task{
Name: "Etcd",
Run: runEtcd,
}
}
func runEtcd(r workflow.RunData) error {
data, ok := r.(InitData)
if !ok {
return errors.New("etcd task invoked with an invalid data struct")
}
klog.V(4).InfoS("[etcd] Running etcd task", "karmada", klog.KObj(data))
cfg := data.Components()
if cfg.Etcd.External != nil {
klog.V(2).InfoS("[etcd] use external etcd, skip install etcd job", "karmada", data.GetName())
return nil
}
if cfg.Etcd.Local == nil {
return errors.New("unexpect empty etcd local configuration")
}
err := etcd.EnsureKarmadaEtcd(data.RemoteClient(), cfg.Etcd.Local, data.GetName(), data.GetNamespace())
if err != nil {
return fmt.Errorf("failed to install etcd component, err: %w", err)
}
klog.V(2).InfoS("[etcd] Successfully installed etcd component", "karmada", klog.KObj(data))
return nil
}

View File

@ -0,0 +1,220 @@
package tasks
import (
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"path"
"regexp"
"strings"
"time"
corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
crdsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
"github.com/karmada-io/karmada/operator/pkg/constants"
"github.com/karmada-io/karmada/operator/pkg/karmadaresource/apiservice"
"github.com/karmada-io/karmada/operator/pkg/karmadaresource/webhookconfiguration"
"github.com/karmada-io/karmada/operator/pkg/util"
"github.com/karmada-io/karmada/operator/pkg/util/apiclient"
"github.com/karmada-io/karmada/operator/pkg/workflow"
)
// NewKarmadaResourcesTask init KarmadaResources task
func NewKarmadaResourcesTask() workflow.Task {
return workflow.Task{
Name: "KarmadaResources",
Run: runKarmadaResources,
RunSubTasks: true,
Tasks: []workflow.Task{
{
Name: "systemNamespace",
Run: runSystemNamespace,
},
{
Name: "crds",
Run: runCrds,
},
{
Name: "WebhookConfiguration",
Run: runWebhookConfiguration,
},
{
Name: "APIService",
Run: runAPIService,
},
},
}
}
func runKarmadaResources(r workflow.RunData) error {
data, ok := r.(InitData)
if !ok {
return errors.New("karmadaResources task invoked with an invalid data struct")
}
klog.V(4).InfoS("[karmadaResources] Running karmadaResources task", "karmada", klog.KObj(data))
return nil
}
func runSystemNamespace(r workflow.RunData) error {
data, ok := r.(InitData)
if !ok {
return errors.New("systemName task invoked with an invalid data struct")
}
err := apiclient.CreateNamespace(data.KarmadaClient(), &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: data.GetNamespace(),
},
})
if err != nil {
return fmt.Errorf("failed to create namespace %s, err: %w", data.GetNamespace(), err)
}
klog.V(2).InfoS("[systemName] Successfully created karmada system namespace", "namespace", data.GetNamespace(), "karmada", klog.KObj(data))
return nil
}
func runCrds(r workflow.RunData) error {
data, ok := r.(InitData)
if !ok {
return errors.New("crds task invoked with an invalid data struct")
}
var (
crdsDir = path.Join(data.DataDir(), data.KarmadaVersion())
crdsPath = path.Join(crdsDir, "crds/bases")
crdsPatchPath = path.Join(crdsDir, "crds/patches")
)
crdsClient, err := apiclient.NewCRDsClient(data.ControlplaneConifg())
if err != nil {
return err
}
if err := createCrds(crdsClient, crdsPath); err != nil {
return fmt.Errorf("failed to create karmada crds, err: %w", err)
}
cert := data.GetCert(constants.CaCertAndKeyName)
if len(cert.CertData()) == 0 {
return errors.New("unexpect empty ca cert data")
}
caBase64 := base64.StdEncoding.EncodeToString(cert.CertData())
if err := patchCrds(crdsClient, crdsPatchPath, caBase64); err != nil {
return fmt.Errorf("failed to patch karmada crds, err: %w", err)
}
klog.V(2).InfoS("[systemName] Successfully applied karmada crds resource", "karmada", klog.KObj(data))
return nil
}
func createCrds(crdsClient *crdsclient.Clientset, crdsPath string) error {
for _, file := range util.ListFiles(crdsPath) {
if file.IsDir() || path.Ext(file.Name()) != ".yaml" {
continue
}
crdBytes, err := util.ReadYamlFile(path.Join(crdsPath, file.Name()))
if err != nil {
return err
}
obj := apiextensionsv1.CustomResourceDefinition{}
if err := json.Unmarshal(crdBytes, &obj); err != nil {
klog.ErrorS(err, "error when converting json byte to apiExtensionsV1 CustomResourceDefinition struct")
return err
}
if err := apiclient.CreateCustomResourceDefinitionIfNeed(crdsClient, &obj); err != nil {
return err
}
}
return nil
}
func patchCrds(crdsClient *crdsclient.Clientset, patchPath string, caBundle string) error {
for _, file := range util.ListFiles(patchPath) {
if file.IsDir() || path.Ext(file.Name()) != ".yaml" {
continue
}
reg, err := regexp.Compile("{{caBundle}}")
if err != nil {
return err
}
crdPath := path.Join(patchPath, file.Name())
crdBytes, err := util.RelpaceYamlForReg(crdPath, caBundle, reg)
if err != nil {
return err
}
crdResource := splitToCrdNameFormFile(file.Name(), "_", ".")
name := crdResource + ".work.karmada.io"
if err := apiclient.PatchCustomResourceDefinition(crdsClient, name, crdBytes); err != nil {
return err
}
}
return nil
}
func runWebhookConfiguration(r workflow.RunData) error {
data, ok := r.(InitData)
if !ok {
return errors.New("[webhookConfiguration] task invoked with an invalid data struct")
}
cert := data.GetCert(constants.CaCertAndKeyName)
if len(cert.CertData()) == 0 {
return errors.New("unexpect empty ca cert data for webhookConfiguration")
}
caBase64 := base64.StdEncoding.EncodeToString(cert.CertData())
return webhookconfiguration.EnsureWebhookconfiguration(
data.KarmadaClient(),
data.GetNamespace(),
data.GetName(),
caBase64)
}
func runAPIService(r workflow.RunData) error {
data, ok := r.(InitData)
if !ok {
return errors.New("webhookConfiguration task invoked with an invalid data struct")
}
config := data.ControlplaneConifg()
client, err := apiclient.NewAPIRegistrationClient(config)
if err != nil {
return err
}
err = apiservice.EnsureAggregatedAPIService(client, data.KarmadaClient(), data.GetName(), data.GetNamespace())
if err != nil {
return fmt.Errorf("failed to apply aggregated APIService resource to karmada controlplane, err: %w", err)
}
waiter := apiclient.NewKarmadaWaiter(config, nil, time.Second*20)
if err := apiclient.TryRunCommand(waiter.WaitForAPIService, 3); err != nil {
return fmt.Errorf("the APIService is unhealthy, err: %w", err)
}
klog.V(2).InfoS("[APIService] Aggregated APIService status is ready ", "karmada", klog.KObj(data))
return nil
}
func splitToCrdNameFormFile(file string, start, end string) string {
index := strings.LastIndex(file, start)
crdName := file[index+1:]
index = strings.Index(crdName, end)
if index > 0 {
crdName = crdName[:index]
}
return crdName
}

View File

@ -0,0 +1,40 @@
package tasks
import (
"errors"
"fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
"github.com/karmada-io/karmada/operator/pkg/util/apiclient"
"github.com/karmada-io/karmada/operator/pkg/workflow"
)
// NewNamespaceTask init a task to create namespace
func NewNamespaceTask() workflow.Task {
return workflow.Task{
Name: "Namespace",
Run: runNamespace,
}
}
func runNamespace(r workflow.RunData) error {
data, ok := r.(InitData)
if !ok {
return errors.New("namespace task invoked with an invalid data struct")
}
klog.V(4).InfoS("[namespace] Running namespace task", "karmada", klog.KObj(data))
err := apiclient.CreateNamespace(data.RemoteClient(), &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: data.GetNamespace(),
},
})
if err != nil {
return fmt.Errorf("failed to create namespace for %s, err: %w", data.GetName(), err)
}
return nil
}

View File

@ -0,0 +1,246 @@
package tasks
import (
"crypto/x509"
"errors"
"fmt"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
certutil "k8s.io/client-go/util/cert"
"k8s.io/klog/v2"
"github.com/karmada-io/karmada/operator/pkg/certs"
"github.com/karmada-io/karmada/operator/pkg/constants"
"github.com/karmada-io/karmada/operator/pkg/util"
"github.com/karmada-io/karmada/operator/pkg/util/apiclient"
"github.com/karmada-io/karmada/operator/pkg/workflow"
)
// NewUploadKubeconfigTask init a task to upload karmada kubeconfig and
// all of karmada certs to secret
func NewUploadKubeconfigTask() workflow.Task {
return workflow.Task{
Name: "upload-config",
RunSubTasks: true,
Run: runUploadKubeconfig,
Tasks: []workflow.Task{
{
Name: "UploadAdminKubeconfig",
Run: runUploadAdminKubeconfig,
},
},
}
}
func runUploadKubeconfig(r workflow.RunData) error {
data, ok := r.(InitData)
if !ok {
return errors.New("upload-config task invoked with an invalid data struct")
}
klog.V(4).InfoS("[upload-config] Running task", "karmada", klog.KObj(data))
return nil
}
func runUploadAdminKubeconfig(r workflow.RunData) error {
data, ok := r.(InitData)
if !ok {
return errors.New("upload-config task invoked with an invalid data struct")
}
apiserverName := util.KarmadaAPIServerName(data.GetName())
// TODO: How to get controlPlaneEndpoint?
localEndpoint := fmt.Sprintf("https://%s.%s.svc.cluster.local:%d", apiserverName, data.GetNamespace(), constants.KarmadaAPIserverListenClientPort)
kubeconfig, err := buildKubeConfigFromSpec(data.GetCert(constants.CaCertAndKeyName), localEndpoint)
if err != nil {
return err
}
configBytes, err := clientcmd.Write(*kubeconfig)
if err != nil {
return err
}
err = apiclient.CreateOrUpdateSecret(data.RemoteClient(), &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: data.GetNamespace(),
Name: util.AdminKubeconfigSercretName(data.GetName()),
},
Data: map[string][]byte{"config": configBytes},
})
if err != nil {
return fmt.Errorf("failed to create secret of kubeconfig, err: %w", err)
}
// store rest config to RunData.
config, err := clientcmd.RESTConfigFromKubeConfig(configBytes)
if err != nil {
return err
}
data.SetControlplaneConifg(config)
klog.V(2).InfoS("[upload-config] Successfully created secret of karmada apiserver kubeconfig", "karmada", klog.KObj(data))
return nil
}
func buildKubeConfigFromSpec(ca *certs.KarmadaCert, serverURL string) (*clientcmdapi.Config, error) {
if ca == nil {
return nil, errors.New("unable build karmada admin kubeconfig, CA cert is empty")
}
cc := newClientCertConfigFromKubeConfigSpec(nil)
client, err := certs.CreateCertAndKeyFilesWithCA(cc, ca.CertData(), ca.KeyData())
if err != nil {
return nil, fmt.Errorf("failed to generate karmada apiserver client certificate for kubeconfig, err: %w", err)
}
return util.CreateWithCerts(
serverURL,
constants.ClusterName,
constants.UserName,
ca.CertData(),
client.KeyData(),
client.CertData(),
), nil
}
// NewUploadCertsTask init a Upload-Certs task
func NewUploadCertsTask() workflow.Task {
return workflow.Task{
Name: "Upload-Certs",
Run: runUploadCerts,
RunSubTasks: true,
Tasks: []workflow.Task{
{
Name: "Upload-KarmadaCert",
Run: runUploadKarmadaCert,
},
{
Name: "Upload-EtcdCert",
Run: runUploadEtcdCert,
},
{
Name: "Upload-WebHookCert",
Run: runUploadWebHookCert,
},
},
}
}
func runUploadCerts(r workflow.RunData) error {
data, ok := r.(InitData)
if !ok {
return errors.New("upload-certs task invoked with an invalid data struct")
}
klog.V(4).InfoS("[upload-certs] Running upload-certs task", "karmada", klog.KObj(data))
if len(data.CertList()) == 0 {
return errors.New("there is no certs in store, please reload crets to store")
}
return nil
}
func runUploadKarmadaCert(r workflow.RunData) error {
data, ok := r.(InitData)
if !ok {
return errors.New("upload-KarmadaCert task invoked with an invalid data struct")
}
certs := data.CertList()
certsData := make(map[string][]byte, len(certs))
for _, c := range certs {
certsData[c.KeyName()] = c.KeyData()
certsData[c.CertName()] = c.CertData()
}
err := apiclient.CreateOrUpdateSecret(data.RemoteClient(), &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: util.KarmadaCertSecretName(data.GetName()),
Namespace: data.GetNamespace(),
},
Data: certsData,
})
if err != nil {
return fmt.Errorf("failed to upload karmada cert to secret, err: %w", err)
}
klog.V(2).InfoS("[upload-KarmadaCert] Successfully uploaded karmada certs to secret", "karmada", klog.KObj(data))
return nil
}
func runUploadEtcdCert(r workflow.RunData) error {
data, ok := r.(InitData)
if !ok {
return errors.New("upload-etcdCert task invoked with an invalid data struct")
}
ca := data.GetCert(constants.EtcdCaCertAndKeyName)
server := data.GetCert(constants.EtcdServerCertAndKeyName)
client := data.GetCert(constants.EtcdClientCertAndKeyName)
err := apiclient.CreateOrUpdateSecret(data.RemoteClient(), &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: data.GetNamespace(),
Name: util.EtcdCertSecretName(data.GetName()),
},
Data: map[string][]byte{
ca.CertName(): ca.CertData(),
ca.KeyName(): ca.KeyData(),
server.CertName(): server.CertData(),
server.KeyName(): server.KeyData(),
client.CertName(): client.CertData(),
client.KeyName(): client.KeyData(),
},
})
if err != nil {
return fmt.Errorf("failed to upload etcd certs to secret, err: %w", err)
}
klog.V(2).InfoS("[upload-etcdCert] Successfully uploaded etcd certs to secret", "karmada", klog.KObj(data))
return nil
}
func runUploadWebHookCert(r workflow.RunData) error {
data, ok := r.(InitData)
if !ok {
return errors.New("upload-webhookCert task invoked with an invalid data struct")
}
cert := data.GetCert(constants.KarmadaCertAndKeyName)
err := apiclient.CreateOrUpdateSecret(data.RemoteClient(), &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: util.WebhookCertSecretName(data.GetName()),
Namespace: data.GetNamespace(),
},
Data: map[string][]byte{
"tls.key": cert.KeyData(),
"tls.crt": cert.CertData(),
},
})
if err != nil {
return fmt.Errorf("failed to upload webhook certs to secret, err: %w", err)
}
klog.V(2).InfoS("[upload-webhookCert] Successfully uploaded webhook certs to secret", "karmada", klog.KObj(data))
return nil
}
func newClientCertConfigFromKubeConfigSpec(notAfter *time.Time) *certs.CertConfig {
return &certs.CertConfig{
Name: "karmada-client",
CAName: constants.CaCertAndKeyName,
Config: certutil.Config{
CommonName: "system:admin",
Organization: []string{"system:masters"},
Usages: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth},
},
NotAfter: notAfter,
}
}

View File

@ -0,0 +1,112 @@
package tasks
import (
"errors"
"fmt"
"time"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog/v2"
"github.com/karmada-io/karmada/operator/pkg/constants"
"github.com/karmada-io/karmada/operator/pkg/util/apiclient"
"github.com/karmada-io/karmada/operator/pkg/workflow"
)
var (
etcdLabels = labels.Set{"karmada-app": constants.Etcd}
karmadaApiserverLabels = labels.Set{"karmada-app": constants.KarmadaAPIServer}
karmadaAggregatedAPIServerLabels = labels.Set{"karmada-app": constants.KarmadaAggregatedAPIServer}
kubeControllerManagerLabels = labels.Set{"karmada-app": constants.KubeControllerManager}
karmadaControllerManagerLabels = labels.Set{"karmada-app": constants.KarmadaControllerManager}
karmadaSchedulerLablels = labels.Set{"karmada-app": constants.KarmadaScheduler}
karmadaWebhookLabels = labels.Set{"karmada-app": constants.KarmadaWebhook}
)
// NewWaitApiserverTask init wait-apiserver task
func NewWaitApiserverTask() workflow.Task {
return workflow.Task{
Name: "wait-apiserver",
Run: runWaitApiserver,
}
}
func runWaitApiserver(r workflow.RunData) error {
data, ok := r.(InitData)
if !ok {
return fmt.Errorf("wait-aipserver task invoked with an invalid data struct")
}
klog.V(4).InfoS("[wait-aipserver] Running task", "karmada", klog.KObj(data))
waiter := apiclient.NewKarmadaWaiter(data.ControlplaneConifg(), data.RemoteClient(), time.Second*30)
// wait etcd, karmada apiserver and aggregated apiserver to ready
// as long as a replica of pod is ready, we consider the service available.
if err := waiter.WaitForSomePods(etcdLabels.String(), data.GetNamespace(), 1); err != nil {
return fmt.Errorf("waiting for etcd to ready timeout, err: %w", err)
}
if err := waiter.WaitForSomePods(karmadaApiserverLabels.String(), data.GetNamespace(), 1); err != nil {
return fmt.Errorf("waiting for karmada apiserver to ready timeout, err: %w", err)
}
err := waiter.WaitForSomePods(karmadaAggregatedAPIServerLabels.String(), data.GetNamespace(), 1)
if err != nil {
return fmt.Errorf("waiting for karmada aggregated apiserver to ready timeout, err: %w", err)
}
// check whether the karmada apiserver is running and health.
if err := apiclient.TryRunCommand(waiter.WaitForAPI, 3); err != nil {
return fmt.Errorf("the karmada apiserver is unhealth, err: %w", err)
}
klog.V(2).InfoS("[wait-aipserver] the etcd, karmada apiserver and aggregated apiserver is ready", "karmada", klog.KObj(data))
return nil
}
// NewWaitControlPlaneTask init wait-controlPlane task
func NewWaitControlPlaneTask() workflow.Task {
return workflow.Task{
Name: "wait-controlPlane",
Run: runWaitControlPlane,
RunSubTasks: true,
Tasks: []workflow.Task{
newWaitControlPlaneSubTask("KubeControllerManager", kubeControllerManagerLabels),
newWaitControlPlaneSubTask("KarmadaControllerManager", karmadaControllerManagerLabels),
newWaitControlPlaneSubTask("KarmadaScheduler", karmadaSchedulerLablels),
newWaitControlPlaneSubTask("KarmadaWebhook", karmadaWebhookLabels),
},
}
}
func runWaitControlPlane(r workflow.RunData) error {
data, ok := r.(InitData)
if !ok {
return errors.New("wait-controlPlane task invoked with an invalid data struct")
}
klog.V(4).InfoS("[wait-controlPlane] Running wait-controlPlane task", "karmada", klog.KObj(data))
return nil
}
func newWaitControlPlaneSubTask(component string, lables labels.Set) workflow.Task {
return workflow.Task{
Name: component,
Run: runWaitControlPlaneSubTask(component, lables),
}
}
func runWaitControlPlaneSubTask(component string, lables labels.Set) func(r workflow.RunData) error {
return func(r workflow.RunData) error {
data, ok := r.(InitData)
if !ok {
return errors.New("wait-controlPlane task invoked with an invalid data struct")
}
waiter := apiclient.NewKarmadaWaiter(nil, data.RemoteClient(), time.Second*120)
if err := waiter.WaitForSomePods(lables.String(), data.GetNamespace(), 1); err != nil {
return fmt.Errorf("waiting for %s to ready timeout, err: %w", component, err)
}
klog.V(2).InfoS("[wait-ControlPlane] component status is ready", "component", component, "karmada", klog.KObj(data))
return nil
}
}

View File

@ -0,0 +1,228 @@
package apiclient
import (
"context"
"errors"
"strings"
admissionregistrationv1 "k8s.io/api/admissionregistration/v1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
crdsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
aggregator "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
)
var errAllocated = errors.New("provided port is already allocated")
// NewCRDsClient is to create a clientset ClientSet
func NewCRDsClient(c *rest.Config) (*crdsclient.Clientset, error) {
return crdsclient.NewForConfig(c)
}
// NewAPIRegistrationClient is to create an apiregistration ClientSet
func NewAPIRegistrationClient(c *rest.Config) (*aggregator.Clientset, error) {
return aggregator.NewForConfig(c)
}
// CreateNamespace creates given namespace when the namespace is not existing.
func CreateNamespace(client clientset.Interface, ns *corev1.Namespace) error {
_, err := client.CoreV1().Namespaces().Get(context.TODO(), ns.GetName(), metav1.GetOptions{})
if err != nil {
if !apierrors.IsNotFound(err) {
return err
}
if _, err := client.CoreV1().Namespaces().Create(context.TODO(), ns, metav1.CreateOptions{}); err != nil {
return err
}
}
klog.V(5).InfoS("Successfully created namespace", "namespace", ns.GetName())
return nil
}
// CreateOrUpdateSecret creates a Sercret if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
func CreateOrUpdateSecret(client clientset.Interface, secret *corev1.Secret) error {
_, err := client.CoreV1().Secrets(secret.GetNamespace()).Create(context.TODO(), secret, metav1.CreateOptions{})
if err != nil {
if !apierrors.IsAlreadyExists(err) {
return err
}
_, err := client.CoreV1().Secrets(secret.GetNamespace()).Update(context.TODO(), secret, metav1.UpdateOptions{})
if err != nil {
return err
}
}
klog.V(5).InfoS("Successfully created or updated secret", "secret", secret.GetName())
return nil
}
// CreateOrUpdateService creates a Service if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
func CreateOrUpdateService(client clientset.Interface, service *corev1.Service) error {
_, err := client.CoreV1().Services(service.GetNamespace()).Create(context.TODO(), service, metav1.CreateOptions{})
if err != nil {
if apierrors.IsAlreadyExists(err) {
_, err := client.CoreV1().Services(service.GetNamespace()).Update(context.TODO(), service, metav1.UpdateOptions{})
return err
}
// Ignore if the Service is invalid with this error message:
// Service "apiserver" is invalid: provided Port is already allocated.
if apierrors.IsInvalid(err) && strings.Contains(err.Error(), errAllocated.Error()) {
klog.V(2).ErrorS(err, "failed to create or update serivce", "service", klog.KObj(service))
return nil
}
return err
}
klog.V(5).InfoS("Successfully created or updated service", "service", service.GetName())
return nil
}
// CreateOrUpdateDeployment creates a Deployment if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
func CreateOrUpdateDeployment(client clientset.Interface, deployment *appsv1.Deployment) error {
_, err := client.AppsV1().Deployments(deployment.GetNamespace()).Create(context.TODO(), deployment, metav1.CreateOptions{})
if err != nil {
if !apierrors.IsAlreadyExists(err) {
return err
}
_, err := client.AppsV1().Deployments(deployment.GetNamespace()).Update(context.TODO(), deployment, metav1.UpdateOptions{})
if err != nil {
return err
}
}
klog.V(5).InfoS("Successfully created or updated deployment", "deployment", deployment.GetName())
return nil
}
// CreateOrUpdateMutatingWebhookConfiguration creates a MutatingWebhookConfiguration if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
func CreateOrUpdateMutatingWebhookConfiguration(client clientset.Interface, mwc *admissionregistrationv1.MutatingWebhookConfiguration) error {
_, err := client.AdmissionregistrationV1().MutatingWebhookConfigurations().Create(context.TODO(), mwc, metav1.CreateOptions{})
if err != nil {
if !apierrors.IsAlreadyExists(err) {
return err
}
older, err := client.AdmissionregistrationV1().MutatingWebhookConfigurations().Get(context.TODO(), mwc.GetName(), metav1.GetOptions{})
if err != nil {
return err
}
mwc.ResourceVersion = older.ResourceVersion
_, err = client.AdmissionregistrationV1().MutatingWebhookConfigurations().Update(context.TODO(), mwc, metav1.UpdateOptions{})
if err != nil {
return err
}
}
klog.V(5).InfoS("Successfully created or updated mutatingWebhookConfiguration", "mutatingWebhookConfiguration", mwc.GetName())
return nil
}
// CreateOrUpdateValidatingWebhookConfiguration creates a ValidatingWebhookConfiguration if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
func CreateOrUpdateValidatingWebhookConfiguration(client clientset.Interface, vwc *admissionregistrationv1.ValidatingWebhookConfiguration) error {
_, err := client.AdmissionregistrationV1().ValidatingWebhookConfigurations().Create(context.TODO(), vwc, metav1.CreateOptions{})
if err != nil {
if !apierrors.IsAlreadyExists(err) {
return err
}
older, err := client.AdmissionregistrationV1().ValidatingWebhookConfigurations().Get(context.TODO(), vwc.GetName(), metav1.GetOptions{})
if err != nil {
return err
}
vwc.ResourceVersion = older.ResourceVersion
_, err = client.AdmissionregistrationV1().ValidatingWebhookConfigurations().Update(context.TODO(), vwc, metav1.UpdateOptions{})
if err != nil {
return err
}
}
klog.V(5).InfoS("Successfully created or updated validatingWebhookConfiguration", "validatingWebhookConfiguration", vwc.GetName())
return nil
}
// CreateOrUpdateAPIService creates a APIService if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
func CreateOrUpdateAPIService(apiRegistrationClient *aggregator.Clientset, apiservice *apiregistrationv1.APIService) error {
_, err := apiRegistrationClient.ApiregistrationV1().APIServices().Create(context.TODO(), apiservice, metav1.CreateOptions{})
if err != nil {
if !apierrors.IsAlreadyExists(err) {
return err
}
older, err := apiRegistrationClient.ApiregistrationV1().APIServices().Get(context.TODO(), apiservice.GetName(), metav1.GetOptions{})
if err != nil {
return err
}
apiservice.ResourceVersion = older.ResourceVersion
_, err = apiRegistrationClient.ApiregistrationV1().APIServices().Update(context.TODO(), apiservice, metav1.UpdateOptions{})
if err != nil {
return err
}
}
klog.V(5).Infof("Successfully created or updated APIService", "APIService", apiservice.Name)
return nil
}
// CreateCustomResourceDefinitionIfNeed creates a CustomResourceDefinition if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
func CreateCustomResourceDefinitionIfNeed(client *crdsclient.Clientset, obj *apiextensionsv1.CustomResourceDefinition) error {
crdClient := client.ApiextensionsV1().CustomResourceDefinitions()
if _, err := crdClient.Create(context.TODO(), obj, metav1.CreateOptions{}); err != nil {
if !apierrors.IsAlreadyExists(err) {
return err
}
klog.V(5).InfoS("Skip already exist crd", "crd", obj.Name)
return nil
}
klog.V(5).InfoS("Successfully created crd", "crd", obj.Name)
return nil
}
// PatchCustomResourceDefinition patchs a crd resource.
func PatchCustomResourceDefinition(client *crdsclient.Clientset, name string, data []byte) error {
crd := client.ApiextensionsV1().CustomResourceDefinitions()
if _, err := crd.Patch(context.TODO(), name, types.StrategicMergePatchType, data, metav1.PatchOptions{}); err != nil {
return err
}
klog.V(5).InfoS("Successfully patched crd", "crd", name)
return nil
}
// CreateOrUpdateStatefulSet creates a StatefulSet if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
func CreateOrUpdateStatefulSet(client clientset.Interface, statefuleSet *appsv1.StatefulSet) error {
_, err := client.AppsV1().StatefulSets(statefuleSet.GetNamespace()).Create(context.TODO(), statefuleSet, metav1.CreateOptions{})
if err != nil {
if !apierrors.IsAlreadyExists(err) {
return err
}
older, err := client.AppsV1().StatefulSets(statefuleSet.GetNamespace()).Get(context.TODO(), statefuleSet.GetName(), metav1.GetOptions{})
if err != nil {
return err
}
statefuleSet.ResourceVersion = older.ResourceVersion
_, err = client.AppsV1().StatefulSets(statefuleSet.GetNamespace()).Update(context.TODO(), statefuleSet, metav1.UpdateOptions{})
if err != nil {
return err
}
}
klog.V(5).InfoS("Successfully created or updated statefulset", "statefulset", statefuleSet.GetName)
return nil
}

View File

@ -0,0 +1,181 @@
package apiclient
import (
"context"
"net/http"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
apiregistrationv1helper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
aggregator "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
)
const (
// APICallRetryInterval defines how long kubeadm should wait before retrying a failed API operation
APICallRetryInterval = 500 * time.Millisecond
// APIServiceName defines the karmada aggregated apiserver APISerivce resource name.
APIServiceName = "v1alpha1.cluster.karmada.io"
)
// Waiter is an interface for waiting for criteria in Karmada to happen
type Waiter interface {
// WaitForAPI waits for the API Server's /healthz endpoint to become "ok"
WaitForAPI() error
// WaitForAPIService waits for the APIService condition to become "true"
WaitForAPIService() error
// WaitForPods waits for Pods in the namespace to become Ready
WaitForPods(label, namespace string) error
// WaitForSomePods waits for the specified number of Pods in the namespace to become Ready
WaitForSomePods(label, namespace string, podNum int32) error
// SetTimeout adjusts the timeout to the specified duration
SetTimeout(timeout time.Duration)
}
// KarmadaWaiter is an implementation of Waiter that is backed by a Kubernetes client
type KarmadaWaiter struct {
karmadaConfig *rest.Config
client clientset.Interface
timeout time.Duration
}
// NewKarmadaWaiter reurn a karmada waiter, the rest config is to create crd client or aggregate client.
func NewKarmadaWaiter(config *rest.Config, client clientset.Interface, timeout time.Duration) Waiter {
return &KarmadaWaiter{
karmadaConfig: config,
client: client,
timeout: timeout,
}
}
// WaitForAPI waits for the API Server's /healthz endpoint to report "ok"
func (w *KarmadaWaiter) WaitForAPI() error {
return wait.PollImmediate(APICallRetryInterval, w.timeout, func() (bool, error) {
healthStatus := 0
w.client.Discovery().RESTClient().Get().AbsPath("/healthz").Do(context.TODO()).StatusCode(&healthStatus)
if healthStatus != http.StatusOK {
return false, nil
}
return true, nil
})
}
// WaitForAPIService waits for the APIService condition to become "true"
func (w *KarmadaWaiter) WaitForAPIService() error {
aggregateClient, err := aggregator.NewForConfig(w.karmadaConfig)
if err != nil {
return err
}
err = wait.PollImmediate(APICallRetryInterval, w.timeout, func() (done bool, err error) {
apiService, err := aggregateClient.ApiregistrationV1().APIServices().Get(context.TODO(), APIServiceName, metav1.GetOptions{})
if err != nil {
return false, nil
}
if apiregistrationv1helper.IsAPIServiceConditionTrue(apiService, apiregistrationv1.Available) {
return true, nil
}
return false, nil
})
if err != nil {
return err
}
return nil
}
// WaitForPods will lookup pods with the given label and wait until they are all
// reporting status as running.
func (w *KarmadaWaiter) WaitForPods(label, namespace string) error {
lastKnownPodNumber := -1
return wait.PollImmediate(APICallRetryInterval, w.timeout, func() (bool, error) {
listOpts := metav1.ListOptions{LabelSelector: label}
pods, err := w.client.CoreV1().Pods(namespace).List(context.TODO(), listOpts)
if err != nil {
return false, nil
}
if lastKnownPodNumber != len(pods.Items) {
lastKnownPodNumber = len(pods.Items)
}
if len(pods.Items) == 0 {
return false, nil
}
for _, pod := range pods.Items {
if !isPodRunning(pod) {
return false, nil
}
}
return true, nil
})
}
// WaitForSomePods lookup pods with the given label and wait until desired number of pods
// reporting status as running.
func (w *KarmadaWaiter) WaitForSomePods(label, namespace string, podNum int32) error {
return wait.PollImmediate(APICallRetryInterval, w.timeout, func() (bool, error) {
listOpts := metav1.ListOptions{LabelSelector: label}
pods, err := w.client.CoreV1().Pods(namespace).List(context.TODO(), listOpts)
if err != nil {
return false, nil
}
if len(pods.Items) == 0 {
return false, nil
}
var expected int32
for _, pod := range pods.Items {
if isPodRunning(pod) {
expected++
}
}
return expected >= podNum, nil
})
}
// SetTimeout adjusts the timeout to the specified duration
func (w *KarmadaWaiter) SetTimeout(timeout time.Duration) {
w.timeout = timeout
}
// TryRunCommand runs a function a maximum of failureThreshold times, and retries on error. If failureThreshold is hit; the last error is returned
func TryRunCommand(f func() error, failureThreshold int) error {
backoff := wait.Backoff{
Duration: 5 * time.Second,
Factor: 2, // double the timeout for every failure
Steps: failureThreshold,
}
return wait.ExponentialBackoff(backoff, func() (bool, error) {
err := f()
if err != nil {
// Retry until the timeout
return false, nil
}
// The last f() call was a success, return cleanly
return true, nil
})
}
func isPodRunning(pod corev1.Pod) bool {
if pod.Status.Phase != corev1.PodRunning || pod.DeletionTimestamp != nil {
return false
}
for _, condtion := range pod.Status.Conditions {
if condtion.Type == corev1.PodReady && condtion.Status == corev1.ConditionTrue {
return true
}
}
return false
}

View File

@ -0,0 +1,40 @@
package util
import (
"fmt"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
)
// CreateWithCerts creates a KubeConfig object with access to the API server with client certificates
func CreateWithCerts(serverURL, clusterName, userName string, caCert []byte, clientKey []byte, clientCert []byte) *clientcmdapi.Config {
config := CreateBasic(serverURL, clusterName, userName, caCert)
config.AuthInfos[userName] = &clientcmdapi.AuthInfo{
ClientKeyData: clientKey,
ClientCertificateData: clientCert,
}
return config
}
// CreateBasic creates a basic, general KubeConfig object that then can be extended
func CreateBasic(serverURL, clusterName, userName string, caCert []byte) *clientcmdapi.Config {
// Use the cluster and the username as the context name
contextName := fmt.Sprintf("%s@%s", userName, clusterName)
return &clientcmdapi.Config{
Clusters: map[string]*clientcmdapi.Cluster{
clusterName: {
Server: serverURL,
CertificateAuthorityData: caCert,
},
},
Contexts: map[string]*clientcmdapi.Context{
contextName: {
Cluster: clusterName,
AuthInfo: userName,
},
},
AuthInfos: map[string]*clientcmdapi.AuthInfo{},
CurrentContext: contextName,
}
}

74
operator/pkg/util/name.go Normal file
View File

@ -0,0 +1,74 @@
package util
import (
"fmt"
"strings"
)
// AdminKubeconfigSercretName return a secret name of karmada admin kubeconfig
func AdminKubeconfigSercretName(name string) string {
return generateResourceName(name, "admin-config")
}
// KarmadaCertSecretName return a secret name of karmada certs
func KarmadaCertSecretName(name string) string {
return generateResourceName(name, "cert")
}
// EtcdCertSecretName return a secret name of etcd cert
func EtcdCertSecretName(name string) string {
return generateResourceName(name, "etcd-cert")
}
// WebhookCertSecretName return secret name of karmada webhook cert
func WebhookCertSecretName(name string) string {
return generateResourceName(name, "webhook-cert")
}
// KarmadaAPIServerName return secret name of karmada apiserver
func KarmadaAPIServerName(name string) string {
return generateResourceName(name, "apiserver")
}
// KarmadaAggratedAPIServerName return secret name of karmada aggregated apiserver
func KarmadaAggratedAPIServerName(name string) string {
return generateResourceName(name, "aggregated-apiserver")
}
// KarmadaEtcdName return karmada etcd name
func KarmadaEtcdName(name string) string {
return generateResourceName(name, "etcd")
}
// KarmadaEtcdClientName return karmada etcd client name
func KarmadaEtcdClientName(name string) string {
return generateResourceName(name, "etcd-client")
}
// KubeControllerManagerName return name of kube controller manager name of karmada
func KubeControllerManagerName(name string) string {
return generateResourceName(name, "kube-controller-manager")
}
// KarmadaControllerManagerName return karmada controller manager name
func KarmadaControllerManagerName(name string) string {
return generateResourceName(name, "controller-manager")
}
// KarmadaSchedulerName return karmada scheduler name
func KarmadaSchedulerName(name string) string {
return generateResourceName(name, "scheduler")
}
// KarmadaWebhookName return karmada webhook name
func KarmadaWebhookName(name string) string {
return generateResourceName(name, "webhook")
}
func generateResourceName(name, suffix string) string {
if strings.Contains(name, "karmada") {
return fmt.Sprintf("%s-%s", name, suffix)
}
return fmt.Sprintf("%s-karmada-%s", name, suffix)
}

View File

@ -0,0 +1,21 @@
package util
import (
"bytes"
"fmt"
"text/template"
)
// ParseTemplate validates and parses passed as argument template
func ParseTemplate(strtmpl string, obj interface{}) ([]byte, error) {
var buf bytes.Buffer
tmpl, err := template.New("template").Parse(strtmpl)
if err != nil {
return nil, fmt.Errorf("error when parsing template, err: %w", err)
}
err = tmpl.Execute(&buf, obj)
if err != nil {
return nil, fmt.Errorf("error when executing template, err: %w", err)
}
return buf.Bytes(), nil
}

181
operator/pkg/util/util.go Normal file
View File

@ -0,0 +1,181 @@
package util
import (
"archive/tar"
"compress/gzip"
"errors"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"regexp"
"time"
"k8s.io/klog/v2"
"sigs.k8s.io/yaml"
)
// Downloader Download progress
type Downloader struct {
io.Reader
Total int64
Current int64
}
// Read Implementation of Downloader
func (d *Downloader) Read(p []byte) (n int, err error) {
n, err = d.Reader.Read(p)
if err != nil {
if err != io.EOF {
return
}
klog.Info("\nDownload complete.")
return
}
d.Current += int64(n)
klog.Info("\rDownloading...[ %.2f%% ]", float64(d.Current*10000/d.Total)/100)
return
}
// DownloadFile Download files via URL
func DownloadFile(url, filePath string) error {
httpClient := http.Client{
Timeout: 60 * time.Second,
}
resp, err := httpClient.Get(url)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return fmt.Errorf("failed download file. url: %s code: %v", url, resp.StatusCode)
}
file, err := os.Create(filePath)
if err != nil {
return err
}
defer file.Close()
downloader := &Downloader{
Reader: resp.Body,
Total: resp.ContentLength,
}
if _, err := io.Copy(file, downloader); err != nil {
return err
}
return nil
}
// Unpack unpack a given file to target path
func Unpack(file, targetPath string) error {
r, err := os.Open(file)
if err != nil {
return err
}
defer r.Close()
gr, err := gzip.NewReader(r)
if err != nil {
return fmt.Errorf("new reader failed. %v", err)
}
defer gr.Close()
tr := tar.NewReader(gr)
for {
header, err := tr.Next()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return err
}
switch header.Typeflag {
case tar.TypeDir:
if err := os.Mkdir(targetPath+"/"+header.Name, 0755); err != nil {
return err
}
case tar.TypeReg:
outFile, err := os.Create(targetPath + "/" + header.Name)
if err != nil {
return err
}
if err := ioCopyN(outFile, tr); err != nil {
return err
}
outFile.Close()
default:
fmt.Printf("uknown type: %v in %s\n", header.Typeflag, header.Name)
}
}
return nil
}
// ioCopyN fix Potential DoS vulnerability via decompression bomb.
func ioCopyN(outFile *os.File, tr *tar.Reader) error {
for {
if _, err := io.CopyN(outFile, tr, 1024); err != nil {
if errors.Is(err, io.EOF) {
break
}
return err
}
}
return nil
}
// ListFiles traverse directory files
func ListFiles(path string) []os.FileInfo {
var files []os.FileInfo
if err := filepath.Walk(path, func(path string, info os.FileInfo, err error) error {
if !info.IsDir() {
files = append(files, info)
}
return nil
}); err != nil {
fmt.Println(err)
}
return files
}
// PathExists check whether the path is exist
func PathExists(path string) (bool, error) {
_, err := os.Stat(path)
if err == nil {
return true, nil
}
if os.IsNotExist(err) {
return false, nil
}
return false, err
}
// ReadYamlFile ready file given path with yaml format
func ReadYamlFile(path string) ([]byte, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}
return yaml.YAMLToJSON(data)
}
// RelpaceYamlForReg replace content of yaml file with a Regexp
func RelpaceYamlForReg(path, destResource string, reg *regexp.Regexp) ([]byte, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}
repl := reg.ReplaceAllString(string(data), destResource)
return yaml.YAMLToJSON([]byte(repl))
}

View File

@ -0,0 +1,95 @@
package workflow
import (
"k8s.io/klog/v2"
)
// RunData is a interface represents all of runDatas abstract object.
type RunData = interface{}
// Job represents a executable workflow, it has list of tasks.
// these tasks must be execution order. if one of these tasks throws
// error, the entire job will fail. During the workflow,if there are
// some artifacts, we can store it to runData.
type Job struct {
Tasks []Task
runData RunData
runDataInitializer func() (RunData, error)
}
// NewJob return a Job with task array.
func NewJob() *Job {
return &Job{
Tasks: []Task{},
}
}
// AppendTask append a task to job, a job has a list of task.
func (j *Job) AppendTask(t Task) {
j.Tasks = append(j.Tasks, t)
}
func (j *Job) initData() (RunData, error) {
if j.runData == nil && j.runDataInitializer != nil {
var err error
if j.runData, err = j.runDataInitializer(); err != nil {
klog.ErrorS(err, "failed to initialize running data")
return nil, err
}
}
return j.runData, nil
}
// SetDataInitializer set a initialize runData function to job.
func (j *Job) SetDataInitializer(build func() (RunData, error)) {
j.runDataInitializer = build
}
// Run start execte job workflow. if the task has sub task, it will
// recursive call the sub tasks util all of task be completed or error be thrown.
func (j *Job) Run() error {
runData := j.runData
if runData == nil {
if _, err := j.initData(); err != nil {
return err
}
}
for _, t := range j.Tasks {
if err := run(t, j.runData); err != nil {
return err
}
}
return nil
}
func run(t Task, data RunData) error {
if t.Skip != nil {
skip, err := t.Skip(data)
if err != nil {
return err
}
if skip {
return nil
}
}
if t.Run != nil {
if err := t.Run(data); err != nil {
return err
}
if t.RunSubTasks {
for _, p := range t.Tasks {
if err := run(p, data); err != nil {
return err
}
}
}
}
return nil
}

View File

@ -0,0 +1,17 @@
package workflow
// Task is minimum unit workflow. It is sample tree structrue.
// we can set a list of sub tasks, they will all be executed if
// RunSubTasks is true.
type Task struct {
Name string
Run func(data RunData) error
Skip func(data RunData) (bool, error)
Tasks []Task
RunSubTasks bool
}
// AppendTask append a sub task.
func (t *Task) AppendTask(task Task) {
t.Tasks = append(t.Tasks, task)
}