553 lines
22 KiB
Go
553 lines
22 KiB
Go
/*
|
|
Copyright 2020 The Karmada Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package status
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"reflect"
|
|
|
|
corev1 "k8s.io/api/core/v1"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
"k8s.io/apimachinery/pkg/api/meta"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
"k8s.io/client-go/tools/cache"
|
|
"k8s.io/client-go/tools/record"
|
|
"k8s.io/client-go/util/retry"
|
|
"k8s.io/klog/v2"
|
|
controllerruntime "sigs.k8s.io/controller-runtime"
|
|
"sigs.k8s.io/controller-runtime/pkg/builder"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
"sigs.k8s.io/controller-runtime/pkg/controller"
|
|
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
|
|
|
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
|
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
|
|
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
|
|
"github.com/karmada-io/karmada/pkg/events"
|
|
"github.com/karmada-io/karmada/pkg/metrics"
|
|
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
|
|
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
|
|
"github.com/karmada-io/karmada/pkg/util"
|
|
"github.com/karmada-io/karmada/pkg/util/fedinformer"
|
|
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
|
|
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
|
|
"github.com/karmada-io/karmada/pkg/util/helper"
|
|
"github.com/karmada-io/karmada/pkg/util/names"
|
|
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
|
|
"github.com/karmada-io/karmada/pkg/util/restmapper"
|
|
)
|
|
|
|
// WorkStatusControllerName is the controller name that will be used when reporting events.
|
|
const WorkStatusControllerName = "work-status-controller"
|
|
|
|
// WorkStatusController is to sync status of Work.
|
|
type WorkStatusController struct {
|
|
client.Client // used to operate Work resources.
|
|
EventRecorder record.EventRecorder
|
|
RESTMapper meta.RESTMapper
|
|
InformerManager genericmanager.MultiClusterInformerManager
|
|
eventHandler cache.ResourceEventHandler // eventHandler knows how to handle events from the member cluster.
|
|
StopChan <-chan struct{}
|
|
worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue.
|
|
// ConcurrentWorkStatusSyncs is the number of Work status that are allowed to sync concurrently.
|
|
ConcurrentWorkStatusSyncs int
|
|
ObjectWatcher objectwatcher.ObjectWatcher
|
|
PredicateFunc predicate.Predicate
|
|
ClusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error)
|
|
ClusterCacheSyncTimeout metav1.Duration
|
|
RateLimiterOptions ratelimiterflag.Options
|
|
ResourceInterpreter resourceinterpreter.ResourceInterpreter
|
|
}
|
|
|
|
// Reconcile performs a full reconciliation for the object referred to by the Request.
|
|
// The Controller will requeue the Request to be processed again if an error is non-nil or
|
|
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
|
|
func (c *WorkStatusController) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
|
|
klog.V(4).Infof("Reconciling status of Work %s.", req.NamespacedName.String())
|
|
|
|
work := &workv1alpha1.Work{}
|
|
if err := c.Client.Get(ctx, req.NamespacedName, work); err != nil {
|
|
// The resource may no longer exist, in which case we stop processing.
|
|
if apierrors.IsNotFound(err) {
|
|
return controllerruntime.Result{}, nil
|
|
}
|
|
|
|
return controllerruntime.Result{}, err
|
|
}
|
|
|
|
if !work.DeletionTimestamp.IsZero() {
|
|
return controllerruntime.Result{}, nil
|
|
}
|
|
|
|
if !helper.IsResourceApplied(&work.Status) {
|
|
return controllerruntime.Result{}, nil
|
|
}
|
|
|
|
clusterName, err := names.GetClusterName(work.GetNamespace())
|
|
if err != nil {
|
|
klog.Errorf("Failed to get member cluster name by %s. Error: %v.", work.GetNamespace(), err)
|
|
return controllerruntime.Result{}, err
|
|
}
|
|
|
|
cluster, err := util.GetCluster(c.Client, clusterName)
|
|
if err != nil {
|
|
klog.Errorf("Failed to the get given member cluster %s", clusterName)
|
|
return controllerruntime.Result{}, err
|
|
}
|
|
|
|
if !util.IsClusterReady(&cluster.Status) {
|
|
klog.Errorf("Stop syncing the Work(%s/%s) to the cluster(%s) as not ready.", work.Namespace, work.Name, cluster.Name)
|
|
return controllerruntime.Result{}, fmt.Errorf("cluster(%s) not ready", cluster.Name)
|
|
}
|
|
|
|
return c.buildResourceInformers(cluster, work)
|
|
}
|
|
|
|
// buildResourceInformers builds informer dynamically for managed resources in member cluster.
|
|
// The created informer watches resource change and then sync to the relevant Work object.
|
|
func (c *WorkStatusController) buildResourceInformers(cluster *clusterv1alpha1.Cluster, work *workv1alpha1.Work) (controllerruntime.Result, error) {
|
|
err := c.registerInformersAndStart(cluster, work)
|
|
if err != nil {
|
|
klog.Errorf("Failed to register informer for Work %s/%s. Error: %v.", work.GetNamespace(), work.GetName(), err)
|
|
return controllerruntime.Result{}, err
|
|
}
|
|
return controllerruntime.Result{}, nil
|
|
}
|
|
|
|
// getEventHandler return callback function that knows how to handle events from the member cluster.
|
|
func (c *WorkStatusController) getEventHandler() cache.ResourceEventHandler {
|
|
if c.eventHandler == nil {
|
|
c.eventHandler = fedinformer.NewHandlerOnAllEvents(c.worker.Enqueue)
|
|
}
|
|
return c.eventHandler
|
|
}
|
|
|
|
// RunWorkQueue initializes worker and run it, worker will process resource asynchronously.
|
|
func (c *WorkStatusController) RunWorkQueue() {
|
|
workerOptions := util.Options{
|
|
Name: "work-status",
|
|
KeyFunc: generateKey,
|
|
ReconcileFunc: c.syncWorkStatus,
|
|
}
|
|
c.worker = util.NewAsyncWorker(workerOptions)
|
|
c.worker.Run(c.ConcurrentWorkStatusSyncs, c.StopChan)
|
|
}
|
|
|
|
// generateKey generates a key from obj, the key contains cluster, GVK, namespace and name.
|
|
func generateKey(obj interface{}) (util.QueueKey, error) {
|
|
resource := obj.(*unstructured.Unstructured)
|
|
cluster, err := getClusterNameFromAnnotation(resource)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// return a nil key when the obj not managed by Karmada, which will be discarded before putting to queue.
|
|
if cluster == "" {
|
|
return nil, nil
|
|
}
|
|
|
|
return keys.FederatedKeyFunc(cluster, obj)
|
|
}
|
|
|
|
// getClusterNameFromAnnotation gets cluster name from ownerLabel, if label not exist, means resource is not created by karmada.
|
|
func getClusterNameFromAnnotation(resource *unstructured.Unstructured) (string, error) {
|
|
workNamespace, exist := resource.GetAnnotations()[workv1alpha2.WorkNamespaceAnnotation]
|
|
if !exist {
|
|
klog.V(4).Infof("Ignore resource(kind=%s, %s/%s) which is not managed by Karmada.", resource.GetKind(), resource.GetNamespace(), resource.GetName())
|
|
return "", nil
|
|
}
|
|
|
|
cluster, err := names.GetClusterName(workNamespace)
|
|
if err != nil {
|
|
klog.Errorf("Failed to get cluster name from work namespace: %s, error: %v.", workNamespace, err)
|
|
return "", err
|
|
}
|
|
return cluster, nil
|
|
}
|
|
|
|
// syncWorkStatus will collect status of object referencing by key and update to work which holds the object.
|
|
func (c *WorkStatusController) syncWorkStatus(key util.QueueKey) error {
|
|
ctx := context.Background()
|
|
fedKey, ok := key.(keys.FederatedKey)
|
|
if !ok {
|
|
klog.Errorf("Failed to sync status as invalid key: %v", key)
|
|
return fmt.Errorf("invalid key")
|
|
}
|
|
|
|
observedObj, err := helper.GetObjectFromCache(c.RESTMapper, c.InformerManager, fedKey)
|
|
if err != nil {
|
|
if apierrors.IsNotFound(err) {
|
|
return c.handleDeleteEvent(ctx, fedKey)
|
|
}
|
|
return err
|
|
}
|
|
|
|
observedAnnotations := observedObj.GetAnnotations()
|
|
workNamespace, nsExist := observedAnnotations[workv1alpha2.WorkNamespaceAnnotation]
|
|
workName, nameExist := observedAnnotations[workv1alpha2.WorkNameAnnotation]
|
|
if !nsExist || !nameExist {
|
|
klog.Infof("Ignore object(%s) which not managed by Karmada.", fedKey.String())
|
|
return nil
|
|
}
|
|
|
|
workObject := &workv1alpha1.Work{}
|
|
if err := c.Client.Get(ctx, client.ObjectKey{Namespace: workNamespace, Name: workName}, workObject); err != nil {
|
|
// Stop processing if the resource no longer exists.
|
|
if apierrors.IsNotFound(err) {
|
|
return nil
|
|
}
|
|
|
|
klog.Errorf("Failed to get Work(%s/%s) from cache: %v", workNamespace, workName, err)
|
|
return err
|
|
}
|
|
|
|
// stop updating status if Work object in terminating state.
|
|
if !workObject.DeletionTimestamp.IsZero() {
|
|
return nil
|
|
}
|
|
|
|
if err := c.updateResource(ctx, observedObj, workObject, fedKey); err != nil {
|
|
return err
|
|
}
|
|
|
|
klog.Infof("Reflecting the resource(kind=%s, %s/%s) status to the Work(%s/%s).", observedObj.GetKind(), observedObj.GetNamespace(), observedObj.GetName(), workNamespace, workName)
|
|
return c.reflectStatus(ctx, workObject, observedObj)
|
|
}
|
|
|
|
func (c *WorkStatusController) updateResource(ctx context.Context, observedObj *unstructured.Unstructured, workObject *workv1alpha1.Work, fedKey keys.FederatedKey) error {
|
|
if helper.IsWorkSuspendDispatching(workObject) {
|
|
return nil
|
|
}
|
|
|
|
desiredObj, err := c.getRawManifest(workObject.Spec.Workload.Manifests, observedObj)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
clusterName, err := names.GetClusterName(workObject.Namespace)
|
|
if err != nil {
|
|
klog.Errorf("Failed to get member cluster name: %v", err)
|
|
return err
|
|
}
|
|
|
|
// we should check if the observed status is consistent with the declaration to prevent accidental changes made
|
|
// in member clusters.
|
|
needUpdate, err := c.ObjectWatcher.NeedsUpdate(clusterName, desiredObj, observedObj)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if needUpdate {
|
|
updateErr := c.ObjectWatcher.Update(ctx, clusterName, desiredObj, observedObj)
|
|
metrics.CountUpdateResourceToCluster(updateErr, desiredObj.GetAPIVersion(), desiredObj.GetKind(), clusterName)
|
|
if updateErr != nil {
|
|
klog.Errorf("Updating %s failed: %v", fedKey.String(), updateErr)
|
|
return updateErr
|
|
}
|
|
// We can't return even after a success updates, because that might lose the chance to collect status.
|
|
// Not all updates are real, they might be no change, in that case there will be no more event for this update,
|
|
// this usually happens with those resources not enables 'metadata.generation', like 'Service'.
|
|
// When a Service's status changes, it's 'metadata.resourceVersion' will be increased, but 'metadata.generation'
|
|
// not increased(defaults to 0), the ObjectWatcher can't easily tell what happened to the object, so ObjectWatcher
|
|
// also needs to update again. The update operation will be a non-operation if the event triggered by Service's
|
|
// status changes.
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *WorkStatusController) handleDeleteEvent(ctx context.Context, key keys.FederatedKey) error {
|
|
executionSpace := names.GenerateExecutionSpaceName(key.Cluster)
|
|
|
|
// Given the workload might have been deleted from informer cache, so that we can't get work object by its label,
|
|
// we have to get work by naming rule as the work's name is generated by the workload's kind, name and namespace.
|
|
workName := names.GenerateWorkName(key.Kind, key.Name, key.Namespace)
|
|
work := &workv1alpha1.Work{}
|
|
if err := c.Client.Get(ctx, client.ObjectKey{Namespace: executionSpace, Name: workName}, work); err != nil {
|
|
// stop processing as the work object has been removed, assume it's a normal delete operation.
|
|
if apierrors.IsNotFound(err) {
|
|
return nil
|
|
}
|
|
|
|
klog.Errorf("Failed to get Work from cache: %v", err)
|
|
return err
|
|
}
|
|
|
|
// stop processing as the work object being deleting.
|
|
if !work.DeletionTimestamp.IsZero() {
|
|
return nil
|
|
}
|
|
|
|
if util.GetLabelValue(work.Labels, util.PropagationInstruction) == util.PropagationInstructionSuppressed {
|
|
return nil
|
|
}
|
|
|
|
reCreateErr := c.recreateResourceIfNeeded(ctx, work, key)
|
|
metrics.CountRecreateResourceToCluster(reCreateErr, key.GroupVersion().String(), key.Kind, key.Cluster)
|
|
if reCreateErr != nil {
|
|
c.updateAppliedCondition(ctx, work, metav1.ConditionFalse, "ReCreateFailed", reCreateErr.Error())
|
|
return reCreateErr
|
|
}
|
|
c.updateAppliedCondition(ctx, work, metav1.ConditionTrue, "ReCreateSuccessful", "Manifest has been successfully applied")
|
|
return nil
|
|
}
|
|
|
|
func (c *WorkStatusController) recreateResourceIfNeeded(ctx context.Context, work *workv1alpha1.Work, workloadKey keys.FederatedKey) error {
|
|
for _, rawManifest := range work.Spec.Workload.Manifests {
|
|
manifest := &unstructured.Unstructured{}
|
|
if err := manifest.UnmarshalJSON(rawManifest.Raw); err != nil {
|
|
return err
|
|
}
|
|
|
|
desiredGVK := schema.FromAPIVersionAndKind(manifest.GetAPIVersion(), manifest.GetKind())
|
|
if reflect.DeepEqual(desiredGVK, workloadKey.GroupVersionKind()) &&
|
|
manifest.GetNamespace() == workloadKey.Namespace &&
|
|
manifest.GetName() == workloadKey.Name {
|
|
klog.Infof("Recreating resource(%s).", workloadKey.String())
|
|
err := c.ObjectWatcher.Create(ctx, workloadKey.Cluster, manifest)
|
|
if err != nil {
|
|
c.eventf(manifest, corev1.EventTypeWarning, events.EventReasonSyncWorkloadFailed, "Failed to create or update resource(%s/%s) in member cluster(%s): %v", manifest.GetNamespace(), manifest.GetName(), workloadKey.Cluster, err)
|
|
return err
|
|
}
|
|
c.eventf(manifest, corev1.EventTypeNormal, events.EventReasonSyncWorkloadSucceed, "Successfully applied resource(%s/%s) to cluster %s", manifest.GetNamespace(), manifest.GetName(), workloadKey.Cluster)
|
|
return nil
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// updateAppliedCondition update the condition for the given Work
|
|
func (c *WorkStatusController) updateAppliedCondition(ctx context.Context, work *workv1alpha1.Work, status metav1.ConditionStatus, reason, message string) {
|
|
newWorkAppliedCondition := metav1.Condition{
|
|
Type: workv1alpha1.WorkApplied,
|
|
Status: status,
|
|
Reason: reason,
|
|
Message: message,
|
|
LastTransitionTime: metav1.Now(),
|
|
}
|
|
|
|
err := retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
|
|
_, err = helper.UpdateStatus(ctx, c.Client, work, func() error {
|
|
meta.SetStatusCondition(&work.Status.Conditions, newWorkAppliedCondition)
|
|
return nil
|
|
})
|
|
return err
|
|
})
|
|
|
|
if err != nil {
|
|
klog.Errorf("Failed to update condition of work %s/%s: %s", work.Namespace, work.Name, err.Error())
|
|
}
|
|
}
|
|
|
|
// reflectStatus grabs cluster object's running status then updates to its owner object(Work).
|
|
func (c *WorkStatusController) reflectStatus(ctx context.Context, work *workv1alpha1.Work, clusterObj *unstructured.Unstructured) error {
|
|
statusRaw, err := c.ResourceInterpreter.ReflectStatus(clusterObj)
|
|
if err != nil {
|
|
klog.Errorf("Failed to reflect status for object(%s/%s/%s) with resourceInterpreter, err: %v",
|
|
clusterObj.GetKind(), clusterObj.GetNamespace(), clusterObj.GetName(), err)
|
|
c.EventRecorder.Eventf(work, corev1.EventTypeWarning, events.EventReasonReflectStatusFailed, "Reflect status for object(%s/%s/%s) failed, err: %s.", clusterObj.GetKind(), clusterObj.GetNamespace(), clusterObj.GetName(), err.Error())
|
|
return err
|
|
}
|
|
c.EventRecorder.Eventf(work, corev1.EventTypeNormal, events.EventReasonReflectStatusSucceed, "Reflect status for object(%s/%s/%s) succeed.", clusterObj.GetKind(), clusterObj.GetNamespace(), clusterObj.GetName())
|
|
|
|
var resourceHealth workv1alpha1.ResourceHealth
|
|
// When an unregistered resource kind is requested with the ResourceInterpreter,
|
|
// the interpreter will return an error, we treat its health status as Unknown.
|
|
healthy, err := c.ResourceInterpreter.InterpretHealth(clusterObj)
|
|
if err != nil {
|
|
resourceHealth = workv1alpha1.ResourceUnknown
|
|
c.EventRecorder.Eventf(work, corev1.EventTypeWarning, events.EventReasonInterpretHealthFailed, "Interpret health of object(%s/%s/%s) failed, err: %s.", clusterObj.GetKind(), clusterObj.GetNamespace(), clusterObj.GetName(), err.Error())
|
|
} else if healthy {
|
|
resourceHealth = workv1alpha1.ResourceHealthy
|
|
c.EventRecorder.Eventf(work, corev1.EventTypeNormal, events.EventReasonInterpretHealthSucceed, "Interpret health of object(%s/%s/%s) as healthy.", clusterObj.GetKind(), clusterObj.GetNamespace(), clusterObj.GetName())
|
|
} else {
|
|
resourceHealth = workv1alpha1.ResourceUnhealthy
|
|
c.EventRecorder.Eventf(work, corev1.EventTypeNormal, events.EventReasonInterpretHealthSucceed, "Interpret health of object(%s/%s/%s) as unhealthy.", clusterObj.GetKind(), clusterObj.GetNamespace(), clusterObj.GetName())
|
|
}
|
|
|
|
identifier, err := c.buildStatusIdentifier(work, clusterObj)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
manifestStatus := workv1alpha1.ManifestStatus{
|
|
Identifier: *identifier,
|
|
Status: statusRaw,
|
|
Health: resourceHealth,
|
|
}
|
|
|
|
return retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
|
|
_, err = helper.UpdateStatus(ctx, c.Client, work, func() error {
|
|
work.Status.ManifestStatuses = c.mergeStatus(work.Status.ManifestStatuses, manifestStatus)
|
|
return nil
|
|
})
|
|
return err
|
|
})
|
|
}
|
|
|
|
func (c *WorkStatusController) buildStatusIdentifier(work *workv1alpha1.Work, clusterObj *unstructured.Unstructured) (*workv1alpha1.ResourceIdentifier, error) {
|
|
manifestRef := helper.ManifestReference{APIVersion: clusterObj.GetAPIVersion(), Kind: clusterObj.GetKind(),
|
|
Namespace: clusterObj.GetNamespace(), Name: clusterObj.GetName()}
|
|
ordinal, err := helper.GetManifestIndex(work.Spec.Workload.Manifests, &manifestRef)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
groupVersion, err := schema.ParseGroupVersion(clusterObj.GetAPIVersion())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
identifier := &workv1alpha1.ResourceIdentifier{
|
|
Ordinal: ordinal,
|
|
// TODO(RainbowMango): Consider merge Group and Version to APIVersion from Work API.
|
|
Group: groupVersion.Group,
|
|
Version: groupVersion.Version,
|
|
Kind: clusterObj.GetKind(),
|
|
Namespace: clusterObj.GetNamespace(),
|
|
Name: clusterObj.GetName(),
|
|
}
|
|
|
|
return identifier, nil
|
|
}
|
|
|
|
func (c *WorkStatusController) mergeStatus(_ []workv1alpha1.ManifestStatus, newStatus workv1alpha1.ManifestStatus) []workv1alpha1.ManifestStatus {
|
|
// TODO(RainbowMango): update 'statuses' if 'newStatus' already exist.
|
|
// For now, we only have at most one manifest in Work, so just override current 'statuses'.
|
|
return []workv1alpha1.ManifestStatus{newStatus}
|
|
}
|
|
|
|
func (c *WorkStatusController) getRawManifest(manifests []workv1alpha1.Manifest, clusterObj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
|
|
for _, rawManifest := range manifests {
|
|
manifest := &unstructured.Unstructured{}
|
|
if err := manifest.UnmarshalJSON(rawManifest.Raw); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if manifest.GetAPIVersion() == clusterObj.GetAPIVersion() &&
|
|
manifest.GetKind() == clusterObj.GetKind() &&
|
|
manifest.GetNamespace() == clusterObj.GetNamespace() &&
|
|
manifest.GetName() == clusterObj.GetName() {
|
|
return manifest, nil
|
|
}
|
|
}
|
|
|
|
return nil, fmt.Errorf("no such manifest exist")
|
|
}
|
|
|
|
// registerInformersAndStart builds informer manager for cluster if it doesn't exist, then constructs informers for gvr
|
|
// and start it.
|
|
func (c *WorkStatusController) registerInformersAndStart(cluster *clusterv1alpha1.Cluster, work *workv1alpha1.Work) error {
|
|
singleClusterInformerManager, err := c.getSingleClusterManager(cluster)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
gvrTargets, err := c.getGVRsFromWork(work)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
allSynced := true
|
|
for gvr := range gvrTargets {
|
|
if !singleClusterInformerManager.IsInformerSynced(gvr) || !singleClusterInformerManager.IsHandlerExist(gvr, c.getEventHandler()) {
|
|
allSynced = false
|
|
singleClusterInformerManager.ForResource(gvr, c.getEventHandler())
|
|
}
|
|
}
|
|
if allSynced {
|
|
return nil
|
|
}
|
|
|
|
c.InformerManager.Start(cluster.Name)
|
|
|
|
if err := func() error {
|
|
synced := c.InformerManager.WaitForCacheSyncWithTimeout(cluster.Name, c.ClusterCacheSyncTimeout.Duration)
|
|
if synced == nil {
|
|
return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name)
|
|
}
|
|
for gvr := range gvrTargets {
|
|
if !synced[gvr] {
|
|
return fmt.Errorf("informer for %s hasn't synced", gvr)
|
|
}
|
|
}
|
|
return nil
|
|
}(); err != nil {
|
|
klog.Errorf("Failed to sync cache for cluster: %s, error: %v", cluster.Name, err)
|
|
c.InformerManager.Stop(cluster.Name)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// getGVRsFromWork traverses the manifests in work to find groupVersionResource list.
|
|
func (c *WorkStatusController) getGVRsFromWork(work *workv1alpha1.Work) (map[schema.GroupVersionResource]bool, error) {
|
|
gvrTargets := map[schema.GroupVersionResource]bool{}
|
|
for _, manifest := range work.Spec.Workload.Manifests {
|
|
workload := &unstructured.Unstructured{}
|
|
err := workload.UnmarshalJSON(manifest.Raw)
|
|
if err != nil {
|
|
klog.Errorf("Failed to unmarshal workload. Error: %v.", err)
|
|
return nil, err
|
|
}
|
|
gvr, err := restmapper.GetGroupVersionResource(c.RESTMapper, workload.GroupVersionKind())
|
|
if err != nil {
|
|
klog.Errorf("Failed to get GVR from GVK for resource %s/%s. Error: %v.", workload.GetNamespace(), workload.GetName(), err)
|
|
return nil, err
|
|
}
|
|
gvrTargets[gvr] = true
|
|
}
|
|
return gvrTargets, nil
|
|
}
|
|
|
|
// getSingleClusterManager gets singleClusterInformerManager with clusterName.
|
|
// If manager is not exist, create it, otherwise gets it from map.
|
|
func (c *WorkStatusController) getSingleClusterManager(cluster *clusterv1alpha1.Cluster) (genericmanager.SingleClusterInformerManager, error) {
|
|
// TODO(chenxianpao): If cluster A is removed, then a new cluster that name also is A joins karmada,
|
|
// the cache in informer manager should be updated.
|
|
singleClusterInformerManager := c.InformerManager.GetSingleClusterManager(cluster.Name)
|
|
if singleClusterInformerManager == nil {
|
|
dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client)
|
|
if err != nil {
|
|
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name)
|
|
return nil, err
|
|
}
|
|
singleClusterInformerManager = c.InformerManager.ForCluster(dynamicClusterClient.ClusterName, dynamicClusterClient.DynamicClientSet, 0)
|
|
}
|
|
return singleClusterInformerManager, nil
|
|
}
|
|
|
|
// SetupWithManager creates a controller and register to controller manager.
|
|
func (c *WorkStatusController) SetupWithManager(mgr controllerruntime.Manager) error {
|
|
return controllerruntime.NewControllerManagedBy(mgr).
|
|
For(&workv1alpha1.Work{}, builder.WithPredicates(c.PredicateFunc)).
|
|
WithOptions(controller.Options{
|
|
RateLimiter: ratelimiterflag.DefaultControllerRateLimiter(c.RateLimiterOptions),
|
|
}).Complete(c)
|
|
}
|
|
|
|
func (c *WorkStatusController) eventf(object *unstructured.Unstructured, eventType, reason, messageFmt string, args ...interface{}) {
|
|
ref, err := helper.GenEventRef(object)
|
|
if err != nil {
|
|
klog.Errorf("Ignore event(%s) as failing to build event reference for: kind=%s, %s due to %v", reason, object.GetKind(), klog.KObj(object), err)
|
|
return
|
|
}
|
|
c.EventRecorder.Eventf(ref, eventType, reason, messageFmt, args...)
|
|
}
|