add karmada agent (#226)

Signed-off-by: lihanbo <lihanbo2@huawei.com>
This commit is contained in:
Hanbo Li 2021-03-24 14:51:13 +08:00 committed by GitHub
parent c2bfd80f71
commit 59a02e0030
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 722 additions and 245 deletions

View File

@ -24,7 +24,7 @@ ifeq ($(VERSION), "")
endif
endif
all: karmada-controller-manager karmada-scheduler karmadactl karmada-webhook
all: karmada-controller-manager karmada-scheduler karmadactl karmada-webhook karmada-agent
karmada-controller-manager: $(SOURCES)
CGO_ENABLED=0 GOOS=$(GOOS) go build \
@ -50,8 +50,14 @@ karmada-webhook: $(SOURCES)
-o karmada-webhook \
cmd/webhook/main.go
karmada-agent: $(SOURCES)
CGO_ENABLED=0 GOOS=$(GOOS) go build \
-ldflags $(LDFLAGS) \
-o karmada-agent \
cmd/agent/main.go
clean:
rm -rf karmada-controller-manager karmada-scheduler karmadactl karmada-webhook
rm -rf karmada-controller-manager karmada-scheduler karmadactl karmada-webhook karmada-agent
.PHONY: update
update:
@ -65,7 +71,7 @@ verify:
test:
go test --race --v ./pkg/...
images: image-karmada-controller-manager image-karmada-scheduler image-karmada-webhook
images: image-karmada-controller-manager image-karmada-scheduler image-karmada-webhook image-karmada-agent
image-karmada-controller-manager: karmada-controller-manager
cp karmada-controller-manager cluster/images/karmada-controller-manager && \
@ -82,6 +88,11 @@ image-karmada-webhook: karmada-webhook
docker build -t $(REGISTRY)/karmada-webhook:$(VERSION) cluster/images/karmada-webhook && \
rm cluster/images/karmada-webhook/karmada-webhook
image-karmada-agent: karmada-agent
cp karmada-agent cluster/images/karmada-agent && \
docker build -t $(REGISTRY)/karmada-agent:$(VERSION) cluster/images/karmada-agent && \
rm cluster/images/karmada-agent/karmada-agent
upload-images: images
@echo "push images to $(REGISTRY)"
ifneq ($(REGISTRY_USER_NAME), "")

View File

@ -0,0 +1,10 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: karmada-agent
rules:
- apiGroups: ['*']
resources: ['*']
verbs: ["get", "watch", "list", "create", "update", "delete"]
- nonResourceURLs: ['*']
verbs: ["get"]

View File

@ -0,0 +1,12 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: karmada-agent
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: karmada-agent
subjects:
- kind: ServiceAccount
name: karmada-agent-sa
namespace: karmada-system

View File

@ -0,0 +1,37 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: karmada-agent
namespace: karmada-system
labels:
app: karmada-agent
spec:
replicas: 1
selector:
matchLabels:
app: karmada-agent
template:
metadata:
labels:
app: karmada-agent
spec:
serviceAccountName: karmada-agent-sa
tolerations:
- key: node-role.kubernetes.io/master
operator: Exists
containers:
- name: karmada-agent
image: swr.ap-southeast-1.myhuaweicloud.com/karmada/karmada-agent:latest
imagePullPolicy: IfNotPresent
command:
- /bin/karmada-agent
- --karmada-kubeconfig=/etc/kubeconfig/karmada-kubeconfig
- --cluster-name={{member_cluster_name}}
volumeMounts:
- name: kubeconfig
mountPath: /etc/kubeconfig
volumes:
- name: kubeconfig
secret:
secretName: karmada-kubeconfig

View File

@ -0,0 +1,4 @@
apiVersion: v1
kind: Namespace
metadata:
name: karmada-system

View File

@ -0,0 +1,5 @@
apiVersion: v1
kind: ServiceAccount
metadata:
name: karmada-agent-sa
namespace: karmada-system

View File

@ -0,0 +1,7 @@
FROM alpine:3.7
RUN apk add --no-cache ca-certificates
ADD karmada-agent /bin/
CMD ["/bin/karmada-agent"]

164
cmd/agent/app/agent.go Normal file
View File

@ -0,0 +1,164 @@
package app
import (
"flag"
"fmt"
"os"
"github.com/spf13/cobra"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"github.com/karmada-io/karmada/cmd/agent/app/options"
clusterapi "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/controllers/execution"
"github.com/karmada-io/karmada/pkg/controllers/status"
karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned"
"github.com/karmada-io/karmada/pkg/karmadactl"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/gclient"
"github.com/karmada-io/karmada/pkg/util/informermanager"
"github.com/karmada-io/karmada/pkg/util/names"
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
)
// NewAgentCommand creates a *cobra.Command object with default parameters
func NewAgentCommand(stopChan <-chan struct{}) *cobra.Command {
opts := options.NewOptions()
cmd := &cobra.Command{
Use: "karmada-agent",
Long: `The karmada agent runs the cluster registration agent`,
Run: func(cmd *cobra.Command, args []string) {
if err := run(opts, stopChan); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
},
}
opts.AddFlags(cmd.Flags())
cmd.Flags().AddGoFlagSet(flag.CommandLine)
return cmd
}
func run(opts *options.Options, stopChan <-chan struct{}) error {
controlPlaneRestConfig, err := clientcmd.BuildConfigFromFlags("", opts.KarmadaKubeConfig)
if err != nil {
return fmt.Errorf("error building kubeconfig of karmada control plane: %s", err.Error())
}
err = registerWithControlPlaneAPIServer(controlPlaneRestConfig, opts.ClusterName)
if err != nil {
return fmt.Errorf("failed to register with karmada control plane: %s", err.Error())
}
executionSpace, err := names.GenerateExecutionSpaceName(opts.ClusterName)
if err != nil {
klog.Errorf("Failed to generate execution space name for member cluster %s, err is %v", opts.ClusterName, err)
return err
}
controllerManager, err := controllerruntime.NewManager(controlPlaneRestConfig, controllerruntime.Options{
Scheme: gclient.NewSchema(),
Namespace: executionSpace,
LeaderElection: false,
LeaderElectionID: "agent.karmada.io",
})
if err != nil {
klog.Errorf("failed to build controller manager: %v", err)
return err
}
setupControllers(controllerManager, controlPlaneRestConfig, opts.ClusterName, stopChan)
// blocks until the stop channel is closed.
if err := controllerManager.Start(stopChan); err != nil {
klog.Errorf("controller manager exits unexpectedly: %v", err)
return err
}
return nil
}
func setupControllers(mgr controllerruntime.Manager, controlPlaneRestConfig *restclient.Config, clusterName string, stopChan <-chan struct{}) {
kubeClientSet := kubernetes.NewForConfigOrDie(controlPlaneRestConfig)
predicateFun := predicate.Funcs{
CreateFunc: func(createEvent event.CreateEvent) bool {
return createEvent.Meta.GetName() == clusterName
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
return updateEvent.MetaOld.GetName() == clusterName
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
return deleteEvent.Meta.GetName() == clusterName
},
GenericFunc: func(genericEvent event.GenericEvent) bool {
return false
},
}
clusterStatusController := &status.ClusterStatusController{
Client: mgr.GetClient(),
KubeClientSet: kubeClientSet,
EventRecorder: mgr.GetEventRecorderFor(status.ControllerName),
PredicateFunc: predicateFun,
ClusterClientSetFunc: util.NewClusterClientSetForAgent,
}
if err := clusterStatusController.SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed to setup cluster status controller: %v", err)
}
objectWatcher := objectwatcher.NewObjectWatcher(kubeClientSet, mgr.GetRESTMapper(), util.NewClusterDynamicClientSetForAgent)
executionController := &execution.Controller{
Client: mgr.GetClient(),
KubeClientSet: kubeClientSet,
EventRecorder: mgr.GetEventRecorderFor(execution.ControllerName),
RESTMapper: mgr.GetRESTMapper(),
ObjectWatcher: objectWatcher,
PredicateFunc: predicate.Funcs{},
ClusterClientSetFunc: util.NewClusterDynamicClientSetForAgent,
}
if err := executionController.SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed to setup execution controller: %v", err)
}
workStatusController := &status.WorkStatusController{
Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(status.WorkStatusControllerName),
RESTMapper: mgr.GetRESTMapper(),
KubeClientSet: kubeClientSet,
InformerManager: informermanager.NewMultiClusterInformerManager(),
StopChan: stopChan,
WorkerNumber: 1,
ObjectWatcher: objectWatcher,
PredicateFunc: predicate.Funcs{},
ClusterClientSetFunc: util.NewClusterDynamicClientSetForAgent,
}
workStatusController.RunWorkQueue()
if err := workStatusController.SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed to setup work status controller: %v", err)
}
}
func registerWithControlPlaneAPIServer(controlPlaneRestConfig *restclient.Config, memberClusterName string) error {
karmadaClient := karmadaclientset.NewForConfigOrDie(controlPlaneRestConfig)
clusterObj := &clusterapi.Cluster{}
clusterObj.Name = memberClusterName
clusterObj.Spec.SyncMode = clusterapi.Pull
_, err := karmadactl.CreateClusterObject(karmadaClient, clusterObj, false)
if err != nil {
klog.Errorf("failed to create cluster object. cluster name: %s, error: %v", memberClusterName, err)
return err
}
return nil
}

View File

@ -0,0 +1,26 @@
package options
import (
"github.com/spf13/pflag"
)
// Options contains everything necessary to create and run controller-manager.
type Options struct {
KarmadaKubeConfig string
ClusterName string
}
// NewOptions builds an default scheduler options.
func NewOptions() *Options {
return &Options{}
}
// AddFlags adds flags of scheduler to the specified FlagSet
func (o *Options) AddFlags(fs *pflag.FlagSet) {
if o == nil {
return
}
fs.StringVar(&o.KarmadaKubeConfig, "karmada-kubeconfig", o.KarmadaKubeConfig, "Path to karmada kubeconfig.")
fs.StringVar(&o.ClusterName, "cluster-name", o.ClusterName, "Name of member cluster that the agent serves for.")
}

20
cmd/agent/main.go Normal file
View File

@ -0,0 +1,20 @@
package main
import (
apiserver "k8s.io/apiserver/pkg/server"
"k8s.io/component-base/logs"
"k8s.io/klog/v2"
"github.com/karmada-io/karmada/cmd/agent/app"
)
func main() {
logs.InitLogs()
defer logs.FlushLogs()
stopChan := apiserver.SetupSignalHandler()
if err := app.NewAgentCommand(stopChan).Execute(); err != nil {
klog.Fatal(err.Error())
}
}

View File

@ -12,9 +12,13 @@ import (
"k8s.io/component-base/logs"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"github.com/karmada-io/karmada/cmd/controller-manager/app/options"
"github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
"github.com/karmada-io/karmada/pkg/controllers/binding"
"github.com/karmada-io/karmada/pkg/controllers/cluster"
"github.com/karmada-io/karmada/pkg/controllers/execution"
@ -26,6 +30,7 @@ import (
"github.com/karmada-io/karmada/pkg/util/detector"
"github.com/karmada-io/karmada/pkg/util/gclient"
"github.com/karmada-io/karmada/pkg/util/informermanager"
"github.com/karmada-io/karmada/pkg/util/names"
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
"github.com/karmada-io/karmada/pkg/util/overridemanager"
)
@ -94,7 +99,7 @@ func setupControllers(mgr controllerruntime.Manager, stopChan <-chan struct{}) {
dynamicClientSet := dynamic.NewForConfigOrDie(resetConfig)
kubeClientSet := kubernetes.NewForConfigOrDie(resetConfig)
objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), kubeClientSet, mgr.GetRESTMapper())
objectWatcher := objectwatcher.NewObjectWatcher(kubeClientSet, mgr.GetRESTMapper(), util.NewClusterDynamicClientSet)
overridemanager := overridemanager.New(mgr.GetClient())
resourceDetector := &detector.ResourceDetector{
@ -109,22 +114,42 @@ func setupControllers(mgr controllerruntime.Manager, stopChan <-chan struct{}) {
klog.Fatalf("Failed to setup resource detector: %v", err)
}
ClusterController := &cluster.Controller{
clusterController := &cluster.Controller{
Client: mgr.GetClient(),
KubeClientSet: kubeClientSet,
EventRecorder: mgr.GetEventRecorderFor(cluster.ControllerName),
}
if err := ClusterController.SetupWithManager(mgr); err != nil {
if err := clusterController.SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed to setup cluster controller: %v", err)
}
ClusterStatusController := &status.ClusterStatusController{
Client: mgr.GetClient(),
KubeClientSet: kubeClientSet,
EventRecorder: mgr.GetEventRecorderFor(status.ControllerName),
clusterPredicateFunc := predicate.Funcs{
CreateFunc: func(createEvent event.CreateEvent) bool {
obj := createEvent.Object.(*v1alpha1.Cluster)
return obj.Spec.SyncMode == v1alpha1.Push
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
obj := updateEvent.ObjectNew.(*v1alpha1.Cluster)
return obj.Spec.SyncMode == v1alpha1.Push
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
obj := deleteEvent.Object.(*v1alpha1.Cluster)
return obj.Spec.SyncMode == v1alpha1.Push
},
GenericFunc: func(genericEvent event.GenericEvent) bool {
return false
},
}
if err := ClusterStatusController.SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed to setup clusterstatus controller: %v", err)
clusterStatusController := &status.ClusterStatusController{
Client: mgr.GetClient(),
KubeClientSet: kubeClientSet,
EventRecorder: mgr.GetEventRecorderFor(status.ControllerName),
PredicateFunc: clusterPredicateFunc,
ClusterClientSetFunc: util.NewClusterClientSet,
}
if err := clusterStatusController.SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed to setup cluster status controller: %v", err)
}
hpaController := &hpa.HorizontalPodAutoscalerController{
@ -166,27 +191,32 @@ func setupControllers(mgr controllerruntime.Manager, stopChan <-chan struct{}) {
klog.Fatalf("Failed to setup cluster resource binding controller: %v", err)
}
workPredicateFunc := newPredicateFuncsForWork(mgr)
executionController := &execution.Controller{
Client: mgr.GetClient(),
KubeClientSet: kubeClientSet,
EventRecorder: mgr.GetEventRecorderFor(execution.ControllerName),
RESTMapper: mgr.GetRESTMapper(),
ObjectWatcher: objectWatcher,
Client: mgr.GetClient(),
KubeClientSet: kubeClientSet,
EventRecorder: mgr.GetEventRecorderFor(execution.ControllerName),
RESTMapper: mgr.GetRESTMapper(),
ObjectWatcher: objectWatcher,
PredicateFunc: workPredicateFunc,
ClusterClientSetFunc: util.NewClusterDynamicClientSet,
}
if err := executionController.SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed to setup execution controller: %v", err)
}
workStatusController := &status.WorkStatusController{
Client: mgr.GetClient(),
DynamicClient: dynamicClientSet,
EventRecorder: mgr.GetEventRecorderFor(status.WorkStatusControllerName),
RESTMapper: mgr.GetRESTMapper(),
KubeClientSet: kubeClientSet,
InformerManager: informermanager.NewMultiClusterInformerManager(),
StopChan: stopChan,
WorkerNumber: 1,
ObjectWatcher: objectWatcher,
Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(status.WorkStatusControllerName),
RESTMapper: mgr.GetRESTMapper(),
KubeClientSet: kubeClientSet,
InformerManager: informermanager.NewMultiClusterInformerManager(),
StopChan: stopChan,
WorkerNumber: 1,
ObjectWatcher: objectWatcher,
PredicateFunc: workPredicateFunc,
ClusterClientSetFunc: util.NewClusterDynamicClientSet,
}
workStatusController.RunWorkQueue()
if err := workStatusController.SetupWithManager(mgr); err != nil {
@ -201,3 +231,56 @@ func setupControllers(mgr controllerruntime.Manager, stopChan <-chan struct{}) {
klog.Fatalf("Failed to setup namespace sync controller: %v", err)
}
}
func newPredicateFuncsForWork(mgr controllerruntime.Manager) predicate.Funcs {
return predicate.Funcs{
CreateFunc: func(createEvent event.CreateEvent) bool {
obj := createEvent.Object.(*workv1alpha1.Work)
clusterName, err := names.GetClusterName(obj.Namespace)
if err != nil {
klog.Errorf("Failed to get member cluster name for work %s/%s", obj.Namespace, obj.Name)
return false
}
cluster, err := util.GetCluster(mgr.GetClient(), clusterName)
if err != nil {
klog.Errorf("Failed to get the given member cluster %s", clusterName)
return false
}
return cluster.Spec.SyncMode == v1alpha1.Push
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
obj := updateEvent.ObjectNew.(*workv1alpha1.Work)
clusterName, err := names.GetClusterName(obj.Namespace)
if err != nil {
klog.Errorf("Failed to get member cluster name for work %s/%s", obj.Namespace, obj.Name)
return false
}
cluster, err := util.GetCluster(mgr.GetClient(), clusterName)
if err != nil {
klog.Errorf("Failed to get the given member cluster %s", clusterName)
return false
}
return cluster.Spec.SyncMode == v1alpha1.Push
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
obj := deleteEvent.Object.(*workv1alpha1.Work)
clusterName, err := names.GetClusterName(obj.Namespace)
if err != nil {
klog.Errorf("Failed to get member cluster name for work %s/%s", obj.Namespace, obj.Name)
return false
}
cluster, err := util.GetCluster(mgr.GetClient(), clusterName)
if err != nil {
klog.Errorf("Failed to get the given member cluster %s", clusterName)
return false
}
return cluster.Spec.SyncMode == v1alpha1.Push
},
GenericFunc: func(genericEvent event.GenericEvent) bool {
return false
},
}
}

View File

@ -45,14 +45,14 @@ func NewSchedulerCommand(stopChan <-chan struct{}) *cobra.Command {
func run(opts *options.Options, stopChan <-chan struct{}) error {
go serveHealthz(fmt.Sprintf("%s:%d", opts.BindAddress, opts.SecurePort))
resetConfig, err := clientcmd.BuildConfigFromFlags(opts.Master, opts.KubeConfig)
restConfig, err := clientcmd.BuildConfigFromFlags(opts.Master, opts.KubeConfig)
if err != nil {
return fmt.Errorf("error building kubeconfig: %s", err.Error())
}
dynamicClientSet := dynamic.NewForConfigOrDie(resetConfig)
karmadaClient := karmadaclientset.NewForConfigOrDie(resetConfig)
kubeClientSet := kubernetes.NewForConfigOrDie(resetConfig)
dynamicClientSet := dynamic.NewForConfigOrDie(restConfig)
karmadaClient := karmadaclientset.NewForConfigOrDie(restConfig)
kubeClientSet := kubernetes.NewForConfigOrDie(restConfig)
ctx, cancel := context.WithCancel(context.Background())
go func() {
@ -67,7 +67,7 @@ func run(opts *options.Options, stopChan <-chan struct{}) error {
return fmt.Errorf("scheduler exited")
}
leaderElectionClient, err := kubernetes.NewForConfig(rest.AddUserAgent(resetConfig, "leader-election"))
leaderElectionClient, err := kubernetes.NewForConfig(rest.AddUserAgent(restConfig, "leader-election"))
if err != nil {
return err
}

50
hack/deploy-karmada-agent.sh Executable file
View File

@ -0,0 +1,50 @@
#!/bin/bash
set -o errexit
set -o nounset
REPO_ROOT=$(dirname "${BASH_SOURCE[0]}")/..
KARMADA_APISERVER_KUBECONFIG=${KARMADA_APISERVER_KUBECONFIG:-"/var/run/karmada/karmada-apiserver.config"}
# The host cluster name which used to install karmada control plane components.
MEMBER_CLUSTER_NAME=${MEMBER_CLUSTER_NAME:-"member3"}
MEMBER_CLUSTER_KUBECONFIG=${MEMBER_CLUSTER_KUBECONFIG:-"${HOME}/.kube/member3.config"}
AGENT_POD_LABEL="karmada-agent"
source ${REPO_ROOT}/hack/util.sh
function usage() {
echo "This script will deploy karmada agent to a cluster."
echo "Usage: hack/deploy-karmada-agent.sh"
echo "Example: hack/deploy-karmada.sh"
}
export REGISTRY="swr.ap-southeast-1.myhuaweicloud.com/karmada"
export VERSION="latest"
kind load docker-image "${REGISTRY}/karmada-agent:${VERSION}" --name="${MEMBER_CLUSTER_NAME}"
export KUBECONFIG="${MEMBER_CLUSTER_KUBECONFIG}"
# create namespace for karmada agent
kubectl apply -f "${REPO_ROOT}/artifacts/agent/namespace.yaml"
# create service account, cluster role for karmada agent
kubectl apply -f "${REPO_ROOT}/artifacts/agent/serviceaccount.yaml"
kubectl apply -f "${REPO_ROOT}/artifacts/agent/clusterrole.yaml"
kubectl apply -f "${REPO_ROOT}/artifacts/agent/clusterrolebinding.yaml"
# create secret
if [[ ! -e ${KARMADA_APISERVER_KUBECONFIG} ]]; then
echo "the kubeconfig file of karmada control plane not exist"
exit 1
fi
kubectl create secret generic karmada-kubeconfig --from-file=karmada-kubeconfig="$KARMADA_APISERVER_KUBECONFIG" -n karmada-system
# deploy karmada agent
cp "${REPO_ROOT}"/artifacts/agent/karmada-agent.yaml "${REPO_ROOT}"/artifacts/agent/karmada-agent.yaml.tmp
sed -i "s/{{member_cluster_name}}/${MEMBER_CLUSTER_NAME}/g" "${REPO_ROOT}"/artifacts/agent/karmada-agent.yaml
kubectl apply -f "${REPO_ROOT}/artifacts/agent/karmada-agent.yaml"
mv "${REPO_ROOT}"/artifacts/agent/karmada-agent.yaml.tmp "${REPO_ROOT}"/artifacts/agent/karmada-agent.yaml
# Wait for karmada-etcd to come up before launching the rest of the components.
util::wait_pod_ready ${AGENT_POD_LABEL} "karmada-system"

View File

@ -26,15 +26,7 @@ function usage() {
# generate a secret to store the certificates
function generate_cert_secret {
local karmada_crt_file=${CERT_DIR}/karmada.crt
local karmada_key_file=${CERT_DIR}/karmada.key
sudo chmod 0644 ${karmada_crt_file}
sudo chmod 0644 ${karmada_key_file}
local karmada_ca=$(sudo cat ${ROOT_CA_FILE} | base64 | tr "\n" " "|sed s/[[:space:]]//g)
local karmada_crt=$(sudo cat ${karmada_crt_file} | base64 | tr "\n" " "|sed s/[[:space:]]//g)
local karmada_key=$(sudo cat ${karmada_key_file} | base64 | tr "\n" " "|sed s/[[:space:]]//g)
local karmada_ca=$(base64 "${ROOT_CA_FILE}" | tr -d '\r\n')
local TEMP_PATH=$(mktemp -d)
cp -rf ${REPO_ROOT}/artifacts/deploy/karmada-cert-secret.yaml ${TEMP_PATH}/karmada-cert-secret-tmp.yaml
@ -42,15 +34,15 @@ function generate_cert_secret {
cp -rf ${REPO_ROOT}/artifacts/deploy/karmada-webhook-cert-secret.yaml ${TEMP_PATH}/karmada-webhook-cert-secret-tmp.yaml
sed -i "s/{{ca_crt}}/${karmada_ca}/g" ${TEMP_PATH}/karmada-cert-secret-tmp.yaml
sed -i "s/{{client_cer}}/${karmada_crt}/g" ${TEMP_PATH}/karmada-cert-secret-tmp.yaml
sed -i "s/{{client_key}}/${karmada_key}/g" ${TEMP_PATH}/karmada-cert-secret-tmp.yaml
sed -i "s/{{client_cer}}/${KARMADA_CRT}/g" ${TEMP_PATH}/karmada-cert-secret-tmp.yaml
sed -i "s/{{client_key}}/${KARMADA_KEY}/g" ${TEMP_PATH}/karmada-cert-secret-tmp.yaml
sed -i "s/{{ca_crt}}/${karmada_ca}/g" ${TEMP_PATH}/secret-tmp.yaml
sed -i "s/{{client_cer}}/${karmada_crt}/g" ${TEMP_PATH}/secret-tmp.yaml
sed -i "s/{{client_key}}/${karmada_key}/g" ${TEMP_PATH}/secret-tmp.yaml
sed -i "s/{{client_cer}}/${KARMADA_CRT}/g" ${TEMP_PATH}/secret-tmp.yaml
sed -i "s/{{client_key}}/${KARMADA_KEY}/g" ${TEMP_PATH}/secret-tmp.yaml
sed -i "s/{{server_key}}/${karmada_key}/g" ${TEMP_PATH}/karmada-webhook-cert-secret-tmp.yaml
sed -i "s/{{server_certificate}}/${karmada_crt}/g" ${TEMP_PATH}/karmada-webhook-cert-secret-tmp.yaml
sed -i "s/{{server_key}}/${KARMADA_KEY}/g" ${TEMP_PATH}/karmada-webhook-cert-secret-tmp.yaml
sed -i "s/{{server_certificate}}/${KARMADA_CRT}/g" ${TEMP_PATH}/karmada-webhook-cert-secret-tmp.yaml
kubectl apply -f ${TEMP_PATH}/karmada-cert-secret-tmp.yaml
kubectl apply -f ${TEMP_PATH}/secret-tmp.yaml
@ -85,7 +77,9 @@ util::create_signing_certkey "${CONTROLPLANE_SUDO}" "${CERT_DIR}" server '"clien
util::create_certkey "${CONTROLPLANE_SUDO}" "${CERT_DIR}" "server-ca" karmada system:admin kubernetes.default.svc "*.etcd.karmada-system.svc.cluster.local" "*.karmada-system.svc.cluster.local" "*.karmada-system.svc" "localhost" "127.0.0.1"
KARMADA_APISERVER_IP=$(docker inspect --format='{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' "${HOST_CLUSTER_NAME}-control-plane")
util::write_client_kubeconfig "${CONTROLPLANE_SUDO}" "${CERT_DIR}" "${ROOT_CA_FILE}" "${KARMADA_APISERVER_IP}" "${KARMADA_APISERVER_SECURE_PORT}" karmada-apiserver
KARMADA_CRT=$(sudo base64 "${CERT_DIR}/karmada.crt" | tr -d '\r\n')
KARMADA_KEY=$(sudo base64 "${CERT_DIR}/karmada.key" | tr -d '\r\n')
util::write_client_kubeconfig "${CONTROLPLANE_SUDO}" "${CERT_DIR}" "${KARMADA_CRT}" "${KARMADA_KEY}" "${KARMADA_APISERVER_IP}" "${KARMADA_APISERVER_SECURE_PORT}" karmada-apiserver
export KUBECONFIG="${HOST_CLUSTER_KUBECONFIG}"

View File

@ -33,15 +33,7 @@ KIND_LOG_FILE=${KIND_LOG_FILE:-"/tmp/karmada"}
# generate a secret to store the certificates
function generate_cert_secret {
local karmada_crt_file=${CERT_DIR}/karmada.crt
local karmada_key_file=${CERT_DIR}/karmada.key
sudo chmod 0644 ${karmada_crt_file}
sudo chmod 0644 ${karmada_key_file}
local karmada_ca=$(sudo cat ${ROOT_CA_FILE} | base64 | tr "\n" " "|sed s/[[:space:]]//g)
local karmada_crt=$(sudo cat ${karmada_crt_file} | base64 | tr "\n" " "|sed s/[[:space:]]//g)
local karmada_key=$(sudo cat ${karmada_key_file} | base64 | tr "\n" " "|sed s/[[:space:]]//g)
local TEMP_PATH=$(mktemp -d)
cp -rf ${REPO_ROOT}/artifacts/deploy/karmada-cert-secret.yaml ${TEMP_PATH}/karmada-cert-secret-tmp.yaml
@ -49,15 +41,15 @@ function generate_cert_secret {
cp -rf ${REPO_ROOT}/artifacts/deploy/karmada-webhook-cert-secret.yaml ${TEMP_PATH}/karmada-webhook-cert-secret-tmp.yaml
sed -i "s/{{ca_crt}}/${karmada_ca}/g" ${TEMP_PATH}/karmada-cert-secret-tmp.yaml
sed -i "s/{{client_cer}}/${karmada_crt}/g" ${TEMP_PATH}/karmada-cert-secret-tmp.yaml
sed -i "s/{{client_key}}/${karmada_key}/g" ${TEMP_PATH}/karmada-cert-secret-tmp.yaml
sed -i "s/{{client_cer}}/${KARMADA_CRT}/g" ${TEMP_PATH}/karmada-cert-secret-tmp.yaml
sed -i "s/{{client_key}}/${KARMADA_KEY}/g" ${TEMP_PATH}/karmada-cert-secret-tmp.yaml
sed -i "s/{{ca_crt}}/${karmada_ca}/g" ${TEMP_PATH}/secret-tmp.yaml
sed -i "s/{{client_cer}}/${karmada_crt}/g" ${TEMP_PATH}/secret-tmp.yaml
sed -i "s/{{client_key}}/${karmada_key}/g" ${TEMP_PATH}/secret-tmp.yaml
sed -i "s/{{client_cer}}/${KARMADA_CRT}/g" ${TEMP_PATH}/secret-tmp.yaml
sed -i "s/{{client_key}}/${KARMADA_KEY}/g" ${TEMP_PATH}/secret-tmp.yaml
sed -i "s/{{server_key}}/${karmada_key}/g" ${TEMP_PATH}/karmada-webhook-cert-secret-tmp.yaml
sed -i "s/{{server_certificate}}/${karmada_crt}/g" ${TEMP_PATH}/karmada-webhook-cert-secret-tmp.yaml
sed -i "s/{{server_key}}/${KARMADA_KEY}/g" ${TEMP_PATH}/karmada-webhook-cert-secret-tmp.yaml
sed -i "s/{{server_certificate}}/${KARMADA_CRT}/g" ${TEMP_PATH}/karmada-webhook-cert-secret-tmp.yaml
kubectl apply -f ${TEMP_PATH}/karmada-cert-secret-tmp.yaml
kubectl apply -f ${TEMP_PATH}/secret-tmp.yaml
@ -120,7 +112,9 @@ kind load docker-image "${REGISTRY}/karmada-webhook:${VERSION}" --name="${HOST_C
#step6. generate kubeconfig and cert secret
KARMADA_APISERVER_IP=$(docker inspect --format='{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' "${HOST_CLUSTER_NAME}-control-plane")
util::write_client_kubeconfig "${CONTROLPLANE_SUDO}" "${CERT_DIR}" "${ROOT_CA_FILE}" "${KARMADA_APISERVER_IP}" "${KARMADA_APISERVER_SECURE_PORT}" karmada-apiserver
KARMADA_CRT=$(sudo base64 "${CERT_DIR}/karmada.crt" | tr -d '\r\n')
KARMADA_KEY=$(sudo base64 "${CERT_DIR}/karmada.key" | tr -d '\r\n')
util::write_client_kubeconfig "${CONTROLPLANE_SUDO}" "${CERT_DIR}" "${KARMADA_CRT}" "${KARMADA_KEY}" "${KARMADA_APISERVER_IP}" "${KARMADA_APISERVER_SECURE_PORT}" karmada-apiserver
#step7. install karmada control plane components
export KUBECONFIG="${HOST_CLUSTER_KUBECONFIG}"

View File

@ -100,15 +100,16 @@ function util::create_certkey {
EOF
}
# util::write_client_kubeconfig creates a self-contained kubeconfig: args are sudo, dest-dir, ca file, host, port, client id, token(optional)
# util::write_client_kubeconfig creates a self-contained kubeconfig: args are sudo, dest-dir, client certificate data, client key data, host, port, client id, token(optional)
function util::write_client_kubeconfig {
local sudo=$1
local dest_dir=$2
local ca_file=$3
local api_host=$4
local api_port=$5
local client_id=$6
local token=${7:-}
local client_certificate_data=$3
local client_key_data=$4
local api_host=$5
local api_port=$6
local client_id=$7
local token=${8:-}
cat <<EOF | ${sudo} tee "${dest_dir}"/"${client_id}".config > /dev/null
apiVersion: v1
kind: Config
@ -120,8 +121,8 @@ clusters:
users:
- user:
token: ${token}
client-certificate: ${dest_dir}/karmada.crt
client-key: ${dest_dir}/karmada.key
client-certificate-data: ${client_certificate_data}
client-key-data: ${client_key_data}
name: karmada-apiserver
contexts:
- context:

View File

@ -31,11 +31,13 @@ const (
// Controller is to sync Work.
type Controller struct {
client.Client // used to operate Work resources.
KubeClientSet kubernetes.Interface // used to get kubernetes resources.
EventRecorder record.EventRecorder
RESTMapper meta.RESTMapper
ObjectWatcher objectwatcher.ObjectWatcher
client.Client // used to operate Work resources.
KubeClientSet kubernetes.Interface // used to get kubernetes resources.
EventRecorder record.EventRecorder
RESTMapper meta.RESTMapper
ObjectWatcher objectwatcher.ObjectWatcher
PredicateFunc predicate.Predicate
ClusterClientSetFunc func(c *v1alpha1.Cluster, client kubernetes.Interface) (*util.DynamicClusterClient, error)
}
// Reconcile performs a full reconciliation for the object referred to by the Request.
@ -54,10 +56,22 @@ func (c *Controller) Reconcile(req controllerruntime.Request) (controllerruntime
return controllerruntime.Result{Requeue: true}, err
}
clusterName, err := names.GetClusterName(work.Namespace)
if err != nil {
klog.Errorf("Failed to get member cluster name for work %s/%s", work.Namespace, work.Name)
return controllerruntime.Result{Requeue: true}, err
}
cluster, err := util.GetCluster(c.Client, clusterName)
if err != nil {
klog.Errorf("Failed to get the given member cluster %s", clusterName)
return controllerruntime.Result{Requeue: true}, err
}
if !work.DeletionTimestamp.IsZero() {
applied := c.isResourceApplied(&work.Status)
if applied {
err := c.tryDeleteWorkload(work)
err := c.tryDeleteWorkload(cluster, work)
if err != nil {
klog.Errorf("Failed to delete work %v, namespace is %v, err is %v", work.Name, work.Namespace, err)
return controllerruntime.Result{Requeue: true}, err
@ -66,7 +80,7 @@ func (c *Controller) Reconcile(req controllerruntime.Request) (controllerruntime
return c.removeFinalizer(work)
}
return c.syncWork(work)
return c.syncWork(cluster, work)
}
// SetupWithManager creates a controller and register to controller manager.
@ -74,13 +88,19 @@ func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error {
return controllerruntime.NewControllerManagedBy(mgr).
For(&workv1alpha1.Work{}).
WithEventFilter(predicate.GenerationChangedPredicate{}).
WithEventFilter(c.PredicateFunc).
Complete(c)
}
func (c *Controller) syncWork(work *workv1alpha1.Work) (controllerruntime.Result, error) {
err := c.dispatchWork(work)
func (c *Controller) syncWork(cluster *v1alpha1.Cluster, work *workv1alpha1.Work) (controllerruntime.Result, error) {
if !util.IsClusterReady(&cluster.Status) {
klog.Errorf("Stop sync work(%s/%s) for cluster(%s) as cluster not ready.", work.Namespace, work.Name, cluster.Name)
return controllerruntime.Result{Requeue: true}, fmt.Errorf("cluster(%s) not ready", cluster.Name)
}
err := c.syncToClusters(cluster, work)
if err != nil {
klog.Errorf("Failed to dispatch work %q, namespace is %v, err is %v", work.Name, work.Namespace, err)
klog.Errorf("Failed to sync work(%s) to cluster(%s): %v", work.Name, cluster.Name, err)
return controllerruntime.Result{Requeue: true}, err
}
@ -101,19 +121,7 @@ func (c *Controller) isResourceApplied(workStatus *workv1alpha1.WorkStatus) bool
// tryDeleteWorkload tries to delete resource in the given member cluster.
// Abort deleting when the member cluster is unready, otherwise we can't unjoin the member cluster when the member cluster is unready
func (c *Controller) tryDeleteWorkload(work *workv1alpha1.Work) error {
clusterName, err := names.GetClusterName(work.Namespace)
if err != nil {
klog.Errorf("Failed to get member cluster name for work %s/%s", work.Namespace, work.Name)
return err
}
cluster, err := util.GetCluster(c.Client, clusterName)
if err != nil {
klog.Errorf("Failed to get the given member cluster %s", clusterName)
return err
}
func (c *Controller) tryDeleteWorkload(cluster *v1alpha1.Cluster, work *workv1alpha1.Work) error {
// Do not clean up resource in the given member cluster if the status of the given member cluster is unready
if !util.IsClusterReady(&cluster.Status) {
klog.Infof("Do not clean up resource in the given member cluster if the status of the given member cluster %s is unready", cluster.Name)
@ -128,7 +136,7 @@ func (c *Controller) tryDeleteWorkload(work *workv1alpha1.Work) error {
return err
}
err = c.ObjectWatcher.Delete(clusterName, workload)
err = c.ObjectWatcher.Delete(cluster, workload)
if err != nil {
klog.Errorf("Failed to delete resource in the given member cluster %v, err is %v", cluster.Name, err)
return err
@ -138,85 +146,6 @@ func (c *Controller) tryDeleteWorkload(work *workv1alpha1.Work) error {
return nil
}
func (c *Controller) dispatchWork(work *workv1alpha1.Work) error {
clusterName, err := names.GetClusterName(work.Namespace)
if err != nil {
klog.Errorf("Failed to get member cluster name for work %s/%s", work.Namespace, work.Name)
return err
}
cluster, err := util.GetCluster(c.Client, clusterName)
if err != nil {
klog.Errorf("Failed to the get given member cluster %s", clusterName)
return err
}
if !util.IsClusterReady(&cluster.Status) {
klog.Errorf("The status of the given member cluster %s is unready", cluster.Name)
return fmt.Errorf("cluster %s is not ready, requeuing operation until cluster state is ready", cluster.Name)
}
err = c.syncToClusters(cluster, work)
if err != nil {
klog.Errorf("Failed to dispatch work %v, namespace is %v, err is %v", work.Name, work.Namespace, err)
return err
}
return nil
}
// syncToClusters ensures that the state of the given object is synchronized to member clusters.
func (c *Controller) syncToClusters(cluster *v1alpha1.Cluster, work *workv1alpha1.Work) error {
clusterDynamicClient, err := util.NewClusterDynamicClientSet(cluster, c.KubeClientSet)
if err != nil {
return err
}
for _, manifest := range work.Spec.Workload.Manifests {
workload := &unstructured.Unstructured{}
err := workload.UnmarshalJSON(manifest.Raw)
if err != nil {
klog.Errorf("failed to unmarshal workload, error is: %v", err)
return err
}
applied := c.isResourceApplied(&work.Status)
if applied {
// todo: get clusterObj from cache
dynamicResource, err := restmapper.GetGroupVersionResource(c.RESTMapper, workload.GroupVersionKind())
if err != nil {
klog.Errorf("Failed to get resource(%s/%s) as mapping GVK to GVR failed: %v", workload.GetNamespace(), workload.GetName(), err)
return err
}
clusterObj, err := clusterDynamicClient.DynamicClientSet.Resource(dynamicResource).Namespace(workload.GetNamespace()).Get(context.TODO(), workload.GetName(), metav1.GetOptions{})
if err != nil {
klog.Errorf("Failed to get resource %v from member cluster, err is %v ", workload.GetName(), err)
return err
}
err = c.ObjectWatcher.Update(cluster.Name, workload, clusterObj)
if err != nil {
klog.Errorf("Failed to update resource in the given member cluster %s, err is %v", cluster.Name, err)
return err
}
} else {
err = c.ObjectWatcher.Create(cluster.Name, workload)
if err != nil {
klog.Errorf("Failed to create resource in the given member cluster %s, err is %v", cluster.Name, err)
return err
}
err := c.updateAppliedCondition(work)
if err != nil {
klog.Errorf("Failed to update applied status for given work %v, namespace is %v, err is %v", work.Name, work.Namespace, err)
return err
}
}
}
return nil
}
// removeFinalizer remove finalizer from the given Work
func (c *Controller) removeFinalizer(work *workv1alpha1.Work) (controllerruntime.Result, error) {
if !controllerutil.ContainsFinalizer(work, util.ExecutionControllerFinalizer) {
@ -231,6 +160,81 @@ func (c *Controller) removeFinalizer(work *workv1alpha1.Work) (controllerruntime
return controllerruntime.Result{}, nil
}
// syncToClusters ensures that the state of the given object is synchronized to member clusters.
func (c *Controller) syncToClusters(cluster *v1alpha1.Cluster, work *workv1alpha1.Work) error {
clusterDynamicClient, err := c.ClusterClientSetFunc(cluster, c.KubeClientSet)
if err != nil {
return err
}
for _, manifest := range work.Spec.Workload.Manifests {
workload := &unstructured.Unstructured{}
err := workload.UnmarshalJSON(manifest.Raw)
if err != nil {
klog.Errorf("failed to unmarshal workload, error is: %v", err)
return err
}
applied := c.isResourceApplied(&work.Status)
if applied {
err = c.tryUpdateWorkload(cluster, workload, clusterDynamicClient)
if err != nil {
klog.Errorf("Failed to update resource in the given member cluster %s, err is %v", cluster.Name, err)
return err
}
} else {
err = c.tryCreateWorkload(cluster, workload)
if err != nil {
klog.Errorf("Failed to create resource in the given member cluster %s, err is %v", cluster.Name, err)
return err
}
}
}
err = c.updateAppliedCondition(work)
if err != nil {
klog.Errorf("Failed to update applied status for given work %v, namespace is %v, err is %v", work.Name, work.Namespace, err)
return err
}
return nil
}
func (c *Controller) tryUpdateWorkload(cluster *v1alpha1.Cluster, workload *unstructured.Unstructured, clusterDynamicClient *util.DynamicClusterClient) error {
// todo: get clusterObj from cache
dynamicResource, err := restmapper.GetGroupVersionResource(c.RESTMapper, workload.GroupVersionKind())
if err != nil {
klog.Errorf("Failed to get resource(%s/%s) as mapping GVK to GVR failed: %v", workload.GetNamespace(), workload.GetName(), err)
return err
}
clusterObj, err := clusterDynamicClient.DynamicClientSet.Resource(dynamicResource).Namespace(workload.GetNamespace()).Get(context.TODO(), workload.GetName(), metav1.GetOptions{})
if err != nil {
if !errors.IsNotFound(err) {
klog.Errorf("Failed to get resource %v from member cluster, err is %v ", workload.GetName(), err)
return err
}
return c.tryCreateWorkload(cluster, workload)
}
err = c.ObjectWatcher.Update(cluster, workload, clusterObj)
if err != nil {
klog.Errorf("Failed to update resource in the given member cluster %s, err is %v", cluster.Name, err)
return err
}
return nil
}
func (c *Controller) tryCreateWorkload(cluster *v1alpha1.Cluster, workload *unstructured.Unstructured) error {
err := c.ObjectWatcher.Create(cluster, workload)
if err != nil {
klog.Errorf("Failed to create resource in the given member cluster %s, err is %v", cluster.Name, err)
return err
}
return nil
}
// updateAppliedCondition update the Applied condition for the given Work
func (c *Controller) updateAppliedCondition(work *workv1alpha1.Work) error {
newWorkAppliedCondition := metav1.Condition{

View File

@ -16,6 +16,7 @@ import (
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/util"
@ -34,9 +35,11 @@ const (
// ClusterStatusController is to sync status of Cluster.
type ClusterStatusController struct {
client.Client // used to operate Cluster resources.
KubeClientSet kubernetes.Interface // used to get kubernetes resources.
EventRecorder record.EventRecorder
client.Client // used to operate Cluster resources.
KubeClientSet kubernetes.Interface // used to get kubernetes resources.
EventRecorder record.EventRecorder
PredicateFunc predicate.Predicate
ClusterClientSetFunc func(c *v1alpha1.Cluster, client kubernetes.Interface) (*util.ClusterClient, error)
}
// Reconcile syncs status of the given member cluster.
@ -71,12 +74,12 @@ func (c *ClusterStatusController) Reconcile(req controllerruntime.Request) (cont
// SetupWithManager creates a controller and register to controller manager.
func (c *ClusterStatusController) SetupWithManager(mgr controllerruntime.Manager) error {
return controllerruntime.NewControllerManagedBy(mgr).For(&v1alpha1.Cluster{}).Complete(c)
return controllerruntime.NewControllerManagedBy(mgr).For(&v1alpha1.Cluster{}).WithEventFilter(c.PredicateFunc).Complete(c)
}
func (c *ClusterStatusController) syncClusterStatus(cluster *v1alpha1.Cluster) (controllerruntime.Result, error) {
// create a ClusterClient for the given member cluster
clusterClient, err := util.NewClusterClientSet(cluster, c.KubeClientSet)
clusterClient, err := c.ClusterClientSetFunc(cluster, c.KubeClientSet)
if err != nil {
klog.Errorf("Failed to create a ClusterClient for the given member cluster: %v, err is : %v", cluster.Name, err)
return controllerruntime.Result{Requeue: true}, err

View File

@ -12,14 +12,15 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/helper"
@ -34,17 +35,18 @@ const WorkStatusControllerName = "work-status-controller"
// WorkStatusController is to sync status of Work.
type WorkStatusController struct {
client.Client // used to operate Work resources.
DynamicClient dynamic.Interface // used to fetch arbitrary resources.
EventRecorder record.EventRecorder
RESTMapper meta.RESTMapper
KubeClientSet kubernetes.Interface // used to get kubernetes resources.
InformerManager informermanager.MultiClusterInformerManager
eventHandler cache.ResourceEventHandler // eventHandler knows how to handle events from the member cluster.
StopChan <-chan struct{}
WorkerNumber int // WorkerNumber is the number of worker goroutines
worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue.
ObjectWatcher objectwatcher.ObjectWatcher
client.Client // used to operate Work resources.
EventRecorder record.EventRecorder
RESTMapper meta.RESTMapper
KubeClientSet kubernetes.Interface // used to get kubernetes resources.
InformerManager informermanager.MultiClusterInformerManager
eventHandler cache.ResourceEventHandler // eventHandler knows how to handle events from the member cluster.
StopChan <-chan struct{}
WorkerNumber int // WorkerNumber is the number of worker goroutines
worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue.
ObjectWatcher objectwatcher.ObjectWatcher
PredicateFunc predicate.Predicate
ClusterClientSetFunc func(c *v1alpha1.Cluster, client kubernetes.Interface) (*util.DynamicClusterClient, error)
}
// Reconcile performs a full reconciliation for the object referred to by the Request.
@ -67,13 +69,25 @@ func (c *WorkStatusController) Reconcile(req controllerruntime.Request) (control
return controllerruntime.Result{}, nil
}
return c.buildResourceInformers(work)
clusterName, err := names.GetClusterName(work.GetNamespace())
if err != nil {
klog.Errorf("Failed to get member cluster name by %s. Error: %v.", work.GetNamespace(), err)
return controllerruntime.Result{Requeue: true}, err
}
cluster, err := util.GetCluster(c.Client, clusterName)
if err != nil {
klog.Errorf("Failed to the get given member cluster %s", clusterName)
return controllerruntime.Result{Requeue: true}, err
}
return c.buildResourceInformers(cluster, work)
}
// buildResourceInformers builds informer dynamically for managed resources in member cluster.
// The created informer watches resource change and then sync to the relevant Work object.
func (c *WorkStatusController) buildResourceInformers(work *workv1alpha1.Work) (controllerruntime.Result, error) {
err := c.registerInformersAndStart(work)
func (c *WorkStatusController) buildResourceInformers(cluster *v1alpha1.Cluster, work *workv1alpha1.Work) (controllerruntime.Result, error) {
err := c.registerInformersAndStart(cluster, work)
if err != nil {
klog.Errorf("Failed to register informer for Work %s/%s. Error: %v.", work.GetNamespace(), work.GetName(), err)
return controllerruntime.Result{Requeue: true}, err
@ -148,14 +162,20 @@ func (c *WorkStatusController) syncWorkStatus(key util.QueueKey) error {
return err
}
cluster, err := util.GetCluster(c.Client, clusterName)
if err != nil {
klog.Errorf("Failed to the get given member cluster %s", clusterName)
return err
}
// compare version to determine if need to update resource
needUpdate, err := c.ObjectWatcher.NeedsUpdate(clusterName, desireObj, obj)
needUpdate, err := c.ObjectWatcher.NeedsUpdate(cluster, desireObj, obj)
if err != nil {
return err
}
if needUpdate {
return c.ObjectWatcher.Update(clusterName, desireObj, obj)
return c.ObjectWatcher.Update(cluster, desireObj, obj)
}
klog.Infof("reflecting %s(%s/%s) status to Work(%s/%s)", obj.GetKind(), obj.GetNamespace(), obj.GetName(), workNamespace, workName)
@ -207,7 +227,12 @@ func (c *WorkStatusController) recreateResourceIfNeeded(work *workv1alpha1.Work,
manifest.GetNamespace() == clusterWorkload.Namespace &&
manifest.GetName() == clusterWorkload.Name {
klog.Infof("recreating %s/%s/%s in member cluster %s", clusterWorkload.GVK.Kind, clusterWorkload.Namespace, clusterWorkload.Name, clusterWorkload.Cluster)
return c.ObjectWatcher.Create(clusterWorkload.Cluster, manifest)
cluster, err := util.GetCluster(c.Client, clusterWorkload.Cluster)
if err != nil {
klog.Errorf("Failed to the get given member cluster %s", clusterWorkload.Cluster)
return err
}
return c.ObjectWatcher.Create(cluster, manifest)
}
}
return nil
@ -344,14 +369,8 @@ func (c *WorkStatusController) getObjectFromCache(key string) (*unstructured.Uns
// registerInformersAndStart builds informer manager for cluster if it doesn't exist, then constructs informers for gvr
// and start it.
func (c *WorkStatusController) registerInformersAndStart(work *workv1alpha1.Work) error {
clusterName, err := names.GetClusterName(work.GetNamespace())
if err != nil {
klog.Errorf("Failed to get member cluster name by %s. Error: %v.", work.GetNamespace(), err)
return err
}
singleClusterInformerManager, err := c.getSingleClusterManager(clusterName)
func (c *WorkStatusController) registerInformersAndStart(cluster *v1alpha1.Cluster, work *workv1alpha1.Work) error {
singleClusterInformerManager, err := c.getSingleClusterManager(cluster)
if err != nil {
return err
}
@ -365,11 +384,11 @@ func (c *WorkStatusController) registerInformersAndStart(work *workv1alpha1.Work
singleClusterInformerManager.ForResource(gvr, c.getEventHandler())
}
c.InformerManager.Start(clusterName, c.StopChan)
synced := c.InformerManager.WaitForCacheSync(clusterName, c.StopChan)
c.InformerManager.Start(cluster.Name, c.StopChan)
synced := c.InformerManager.WaitForCacheSync(cluster.Name, c.StopChan)
if synced == nil {
klog.Errorf("No informerFactory for cluster %s exist.", clusterName)
return fmt.Errorf("no informerFactory for cluster %s exist", clusterName)
klog.Errorf("No informerFactory for cluster %s exist.", cluster.Name)
return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name)
}
for gvr := range gvrTargets {
if !synced[gvr] {
@ -402,14 +421,14 @@ func (c *WorkStatusController) getGVRsFromWork(work *workv1alpha1.Work) (map[sch
// getSingleClusterManager gets singleClusterInformerManager with clusterName.
// If manager is not exist, create it, otherwise gets it from map.
func (c *WorkStatusController) getSingleClusterManager(clusterName string) (informermanager.SingleClusterInformerManager, error) {
func (c *WorkStatusController) getSingleClusterManager(cluster *v1alpha1.Cluster) (informermanager.SingleClusterInformerManager, error) {
// TODO(chenxianpao): If cluster A is removed, then a new cluster that name also is A joins karmada,
// the cache in informer manager should be updated.
singleClusterInformerManager := c.InformerManager.GetSingleClusterManager(clusterName)
singleClusterInformerManager := c.InformerManager.GetSingleClusterManager(cluster.Name)
if singleClusterInformerManager == nil {
dynamicClusterClient, err := util.BuildDynamicClusterClient(c.Client, c.KubeClientSet, clusterName)
dynamicClusterClient, err := c.ClusterClientSetFunc(cluster, c.KubeClientSet)
if err != nil {
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", clusterName)
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name)
return nil, err
}
singleClusterInformerManager = c.InformerManager.ForCluster(dynamicClusterClient.ClusterName, dynamicClusterClient.DynamicClientSet, 0)
@ -419,5 +438,5 @@ func (c *WorkStatusController) getSingleClusterManager(clusterName string) (info
// SetupWithManager creates a controller and register to controller manager.
func (c *WorkStatusController) SetupWithManager(mgr controllerruntime.Manager) error {
return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.Work{}).Complete(c)
return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.Work{}).WithEventFilter(c.PredicateFunc).Complete(c)
}

View File

@ -267,7 +267,7 @@ func RunJoin(cmdOut io.Writer, karmadaConfig KarmadaConfig, opts CommandJoinOpti
clusterObj.Spec.InsecureSkipTLSVerification = true
}
cluster, err := createClusterObject(controlPlaneKarmadaClient, clusterObj, false)
cluster, err := CreateClusterObject(controlPlaneKarmadaClient, clusterObj, false)
if err != nil {
klog.Errorf("failed to create cluster object. cluster name: %s, error: %v", opts.ClusterName, err)
return err
@ -398,7 +398,8 @@ func ensureClusterRoleBindingExist(client kubeclient.Interface, clusterRoleBindi
return createdObj, nil
}
func createClusterObject(controlPlaneClient *karmadaclientset.Clientset, clusterObj *clusterv1alpha1.Cluster, errorOnExisting bool) (*clusterv1alpha1.Cluster, error) {
// CreateClusterObject create cluster object in karmada control plane
func CreateClusterObject(controlPlaneClient *karmadaclientset.Clientset, clusterObj *clusterv1alpha1.Cluster, errorOnExisting bool) (*clusterv1alpha1.Cluster, error) {
cluster, exist, err := GetCluster(controlPlaneClient, clusterObj.Namespace, clusterObj.Name)
if err != nil {
klog.Errorf("failed to create cluster object. cluster: %s/%s, error: %v", clusterObj.Namespace, clusterObj.Name, err)

View File

@ -9,6 +9,7 @@ import (
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
controllerruntime "sigs.k8s.io/controller-runtime"
"github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
)
@ -48,6 +49,20 @@ func NewClusterClientSet(c *v1alpha1.Cluster, client kubeclientset.Interface) (*
return &clusterClientSet, nil
}
// NewClusterClientSetForAgent returns a ClusterClient for the given member cluster which will be used in karmada agent.
func NewClusterClientSetForAgent(c *v1alpha1.Cluster, client kubeclientset.Interface) (*ClusterClient, error) {
clusterConfig, err := controllerruntime.GetConfig()
if err != nil {
return nil, fmt.Errorf("error building kubeconfig of member cluster: %s", err.Error())
}
var clusterClientSet = ClusterClient{ClusterName: c.Name}
if clusterConfig != nil {
clusterClientSet.KubeClient = kubeclientset.NewForConfigOrDie(clusterConfig)
}
return &clusterClientSet, nil
}
// NewClusterDynamicClientSet returns a dynamic client for the given member cluster.
func NewClusterDynamicClientSet(c *v1alpha1.Cluster, client kubeclientset.Interface) (*DynamicClusterClient, error) {
clusterConfig, err := buildClusterConfig(c, client)
@ -62,6 +77,20 @@ func NewClusterDynamicClientSet(c *v1alpha1.Cluster, client kubeclientset.Interf
return &clusterClientSet, nil
}
// NewClusterDynamicClientSetForAgent returns a dynamic client for the given member cluster which will be used in karmada agent.
func NewClusterDynamicClientSetForAgent(c *v1alpha1.Cluster, client kubeclientset.Interface) (*DynamicClusterClient, error) {
clusterConfig, err := controllerruntime.GetConfig()
if err != nil {
return nil, fmt.Errorf("error building kubeconfig of member cluster: %s", err.Error())
}
var clusterClientSet = DynamicClusterClient{ClusterName: c.Name}
if clusterConfig != nil {
clusterClientSet.DynamicClientSet = dynamic.NewForConfigOrDie(clusterConfig)
}
return &clusterClientSet, nil
}
func buildClusterConfig(cluster *v1alpha1.Cluster, client kubeclientset.Interface) (*rest.Config, error) {
clusterName := cluster.Name
apiEndpoint := cluster.Spec.APIEndpoint

View File

@ -14,8 +14,8 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/restmapper"
)
@ -27,34 +27,37 @@ const (
// ObjectWatcher manages operations for object dispatched to member clusters.
type ObjectWatcher interface {
Create(clusterName string, desireObj *unstructured.Unstructured) error
Update(clusterName string, desireObj, clusterObj *unstructured.Unstructured) error
Delete(clusterName string, desireObj *unstructured.Unstructured) error
NeedsUpdate(clusterName string, desiredObj, clusterObj *unstructured.Unstructured) (bool, error)
Create(cluster *v1alpha1.Cluster, desireObj *unstructured.Unstructured) error
Update(cluster *v1alpha1.Cluster, desireObj, clusterObj *unstructured.Unstructured) error
Delete(cluster *v1alpha1.Cluster, desireObj *unstructured.Unstructured) error
NeedsUpdate(cluster *v1alpha1.Cluster, desiredObj, clusterObj *unstructured.Unstructured) (bool, error)
}
// ClientSetFunc is used to generate client set of member cluster
type ClientSetFunc func(c *v1alpha1.Cluster, client kubernetes.Interface) (*util.DynamicClusterClient, error)
type objectWatcherImpl struct {
client.Client
KubeClientSet kubernetes.Interface
VersionRecord map[string]map[string]string
RESTMapper meta.RESTMapper
Lock sync.RWMutex
Lock sync.RWMutex
RESTMapper meta.RESTMapper
KubeClientSet kubernetes.Interface
VersionRecord map[string]map[string]string
ClusterClientSetFunc ClientSetFunc
}
// NewObjectWatcher returns a instance of ObjectWatcher
func NewObjectWatcher(client client.Client, kubeClientSet kubernetes.Interface, restMapper meta.RESTMapper) ObjectWatcher {
func NewObjectWatcher(kubeClientSet kubernetes.Interface, restMapper meta.RESTMapper, clusterClientSetFunc ClientSetFunc) ObjectWatcher {
return &objectWatcherImpl{
Client: client,
KubeClientSet: kubeClientSet,
VersionRecord: make(map[string]map[string]string),
RESTMapper: restMapper,
KubeClientSet: kubeClientSet,
VersionRecord: make(map[string]map[string]string),
RESTMapper: restMapper,
ClusterClientSetFunc: clusterClientSetFunc,
}
}
func (o *objectWatcherImpl) Create(clusterName string, desireObj *unstructured.Unstructured) error {
dynamicClusterClient, err := util.BuildDynamicClusterClient(o.Client, o.KubeClientSet, clusterName)
func (o *objectWatcherImpl) Create(cluster *v1alpha1.Cluster, desireObj *unstructured.Unstructured) error {
dynamicClusterClient, err := o.ClusterClientSetFunc(cluster, o.KubeClientSet)
if err != nil {
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", clusterName)
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name)
return err
}
@ -72,17 +75,17 @@ func (o *objectWatcherImpl) Create(clusterName string, desireObj *unstructured.U
klog.Errorf("Failed to create resource %v, err is %v ", desireObj.GetName(), err)
return err
}
klog.Infof("Created resource(kind=%s, %s/%s) on cluster: %s", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName)
klog.Infof("Created resource(kind=%s, %s/%s) on cluster: %s", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), cluster.Name)
// record version
o.recordVersion(clusterObj, dynamicClusterClient.ClusterName)
return nil
}
func (o *objectWatcherImpl) Update(clusterName string, desireObj, clusterObj *unstructured.Unstructured) error {
dynamicClusterClient, err := util.BuildDynamicClusterClient(o.Client, o.KubeClientSet, clusterName)
func (o *objectWatcherImpl) Update(cluster *v1alpha1.Cluster, desireObj, clusterObj *unstructured.Unstructured) error {
dynamicClusterClient, err := o.ClusterClientSetFunc(cluster, o.KubeClientSet)
if err != nil {
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", clusterName)
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name)
return err
}
@ -104,17 +107,17 @@ func (o *objectWatcherImpl) Update(clusterName string, desireObj, clusterObj *un
return err
}
klog.Infof("Updated resource(kind=%s, %s/%s) on cluster: %s", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName)
klog.Infof("Updated resource(kind=%s, %s/%s) on cluster: %s", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), cluster.Name)
// record version
o.recordVersion(resource, clusterName)
o.recordVersion(resource, cluster.Name)
return nil
}
func (o *objectWatcherImpl) Delete(clusterName string, desireObj *unstructured.Unstructured) error {
dynamicClusterClient, err := util.BuildDynamicClusterClient(o.Client, o.KubeClientSet, clusterName)
func (o *objectWatcherImpl) Delete(cluster *v1alpha1.Cluster, desireObj *unstructured.Unstructured) error {
dynamicClusterClient, err := o.ClusterClientSetFunc(cluster, o.KubeClientSet)
if err != nil {
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", clusterName)
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name)
return err
}
@ -132,7 +135,7 @@ func (o *objectWatcherImpl) Delete(clusterName string, desireObj *unstructured.U
klog.Errorf("Failed to delete resource %v, err is %v ", desireObj.GetName(), err)
return err
}
klog.Infof("Deleted resource(kind=%s, %s/%s) on cluster: %s", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName)
klog.Infof("Deleted resource(kind=%s, %s/%s) on cluster: %s", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), cluster.Name)
objectKey := o.genObjectKey(desireObj)
o.deleteVersionRecord(dynamicClusterClient.ClusterName, objectKey)
@ -195,9 +198,9 @@ func (o *objectWatcherImpl) deleteVersionRecord(clusterName, resourceName string
delete(o.VersionRecord[clusterName], resourceName)
}
func (o *objectWatcherImpl) NeedsUpdate(clusterName string, desiredObj, clusterObj *unstructured.Unstructured) (bool, error) {
func (o *objectWatcherImpl) NeedsUpdate(cluster *v1alpha1.Cluster, desiredObj, clusterObj *unstructured.Unstructured) (bool, error) {
// get resource version
version, exist := o.getVersionRecord(clusterName, desiredObj.GroupVersionKind().String()+"/"+desiredObj.GetNamespace()+"/"+desiredObj.GetName())
version, exist := o.getVersionRecord(cluster.Name, desiredObj.GroupVersionKind().String()+"/"+desiredObj.GetNamespace()+"/"+desiredObj.GetName())
if !exist {
klog.Errorf("Failed to update resource %v/%v for the version record does not exist", desiredObj.GetNamespace(), desiredObj.GetName())
return false, fmt.Errorf("failed to update resource %v/%v for the version record does not exist", desiredObj.GetNamespace(), desiredObj.GetName())