Compare commits
9 Commits
34c36a91d5
...
a4795803d1
Author | SHA1 | Date |
---|---|---|
|
a4795803d1 | |
|
5f4bd5e765 | |
|
3024d3321e | |
|
a1290871ea | |
|
bcb3b08376 | |
|
f4f63c8d25 | |
|
be98c622e0 | |
|
96f4744eb2 | |
|
689680162e |
|
@ -42,7 +42,7 @@ jobs:
|
||||||
with:
|
with:
|
||||||
go-version-file: go.mod
|
go-version-file: go.mod
|
||||||
- name: Install Cosign
|
- name: Install Cosign
|
||||||
uses: sigstore/cosign-installer@v3.9.1
|
uses: sigstore/cosign-installer@v3.9.2
|
||||||
with:
|
with:
|
||||||
cosign-release: 'v2.2.3'
|
cosign-release: 'v2.2.3'
|
||||||
- name: install QEMU
|
- name: install QEMU
|
||||||
|
|
|
@ -42,7 +42,7 @@ jobs:
|
||||||
with:
|
with:
|
||||||
go-version-file: go.mod
|
go-version-file: go.mod
|
||||||
- name: Install Cosign
|
- name: Install Cosign
|
||||||
uses: sigstore/cosign-installer@v3.9.1
|
uses: sigstore/cosign-installer@v3.9.2
|
||||||
with:
|
with:
|
||||||
cosign-release: 'v2.2.3'
|
cosign-release: 'v2.2.3'
|
||||||
- name: install QEMU
|
- name: install QEMU
|
||||||
|
|
|
@ -18,6 +18,7 @@ package app
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -27,6 +28,7 @@ import (
|
||||||
|
|
||||||
"github.com/karmada-io/karmada/cmd/descheduler/app/options"
|
"github.com/karmada-io/karmada/cmd/descheduler/app/options"
|
||||||
"github.com/karmada-io/karmada/pkg/util/names"
|
"github.com/karmada-io/karmada/pkg/util/names"
|
||||||
|
testingutil "github.com/karmada-io/karmada/pkg/util/testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestNewDeschedulerCommand(t *testing.T) {
|
func TestNewDeschedulerCommand(t *testing.T) {
|
||||||
|
@ -66,8 +68,10 @@ func TestDeschedulerCommandFlagParsing(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestServeHealthzAndMetrics(t *testing.T) {
|
func TestServeHealthzAndMetrics(t *testing.T) {
|
||||||
healthAddress := "127.0.0.1:8082"
|
ports, err := testingutil.GetFreePorts("127.0.0.1", 2)
|
||||||
metricsAddress := "127.0.0.1:8083"
|
require.NoError(t, err)
|
||||||
|
healthAddress := fmt.Sprintf("127.0.0.1:%d", ports[0])
|
||||||
|
metricsAddress := fmt.Sprintf("127.0.0.1:%d", ports[1])
|
||||||
|
|
||||||
go serveHealthzAndMetrics(healthAddress, metricsAddress)
|
go serveHealthzAndMetrics(healthAddress, metricsAddress)
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,7 @@ package app
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -27,6 +28,7 @@ import (
|
||||||
|
|
||||||
"github.com/karmada-io/karmada/cmd/scheduler/app/options"
|
"github.com/karmada-io/karmada/cmd/scheduler/app/options"
|
||||||
"github.com/karmada-io/karmada/pkg/util/names"
|
"github.com/karmada-io/karmada/pkg/util/names"
|
||||||
|
testingutil "github.com/karmada-io/karmada/pkg/util/testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestNewSchedulerCommand(t *testing.T) {
|
func TestNewSchedulerCommand(t *testing.T) {
|
||||||
|
@ -66,8 +68,10 @@ func TestSchedulerCommandFlagParsing(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestServeHealthzAndMetrics(t *testing.T) {
|
func TestServeHealthzAndMetrics(t *testing.T) {
|
||||||
healthAddress := "127.0.0.1:8082"
|
ports, err := testingutil.GetFreePorts("127.0.0.1", 2)
|
||||||
metricsAddress := "127.0.0.1:8083"
|
require.NoError(t, err)
|
||||||
|
healthAddress := fmt.Sprintf("127.0.0.1:%d", ports[0])
|
||||||
|
metricsAddress := fmt.Sprintf("127.0.0.1:%d", ports[1])
|
||||||
|
|
||||||
go serveHealthzAndMetrics(healthAddress, metricsAddress)
|
go serveHealthzAndMetrics(healthAddress, metricsAddress)
|
||||||
|
|
||||||
|
|
|
@ -139,13 +139,13 @@ func (c *FHPAController) SetupWithManager(mgr controllerruntime.Manager) error {
|
||||||
// The Controller will requeue the Request to be processed again if an error is non-nil or
|
// The Controller will requeue the Request to be processed again if an error is non-nil or
|
||||||
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
|
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
|
||||||
func (c *FHPAController) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
|
func (c *FHPAController) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
|
||||||
klog.V(4).Infof("Reconciling FederatedHPA %s.", req.NamespacedName.String())
|
klog.V(4).InfoS("Reconciling FederatedHPA", "namespacedName", req.NamespacedName.String())
|
||||||
|
|
||||||
hpa := &autoscalingv1alpha1.FederatedHPA{}
|
hpa := &autoscalingv1alpha1.FederatedHPA{}
|
||||||
key := req.NamespacedName.String()
|
key := req.NamespacedName.String()
|
||||||
if err := c.Client.Get(ctx, req.NamespacedName, hpa); err != nil {
|
if err := c.Client.Get(ctx, req.NamespacedName, hpa); err != nil {
|
||||||
if apierrors.IsNotFound(err) {
|
if apierrors.IsNotFound(err) {
|
||||||
klog.Infof("FederatedHPA %s has been deleted in %s", req.Name, req.Namespace)
|
klog.InfoS("FederatedHPA has been deleted in namespace", "hpaName", req.Name, "namespace", req.Namespace)
|
||||||
c.recommendationsLock.Lock()
|
c.recommendationsLock.Lock()
|
||||||
delete(c.recommendations, key)
|
delete(c.recommendations, key)
|
||||||
c.recommendationsLock.Unlock()
|
c.recommendationsLock.Unlock()
|
||||||
|
@ -344,7 +344,7 @@ func (c *FHPAController) reconcileAutoscaler(ctx context.Context, hpa *autoscali
|
||||||
retErr = err
|
retErr = err
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.V(4).Infof("proposing %v desired replicas (based on %s from %s) for %s", metricDesiredReplicas, metricName, metricTimestamp, reference)
|
klog.V(4).InfoS("proposing desired replicas for resource", "desiredReplicas", metricDesiredReplicas, "metricName", metricName, "metricTimestamp", metricTimestamp, "resource", reference)
|
||||||
|
|
||||||
rescaleMetric := ""
|
rescaleMetric := ""
|
||||||
if metricDesiredReplicas > desiredReplicas {
|
if metricDesiredReplicas > desiredReplicas {
|
||||||
|
@ -382,8 +382,8 @@ func (c *FHPAController) reconcileAutoscaler(ctx context.Context, hpa *autoscali
|
||||||
setCondition(hpa, autoscalingv2.AbleToScale, corev1.ConditionTrue, "SucceededRescale", "the HPA controller was able to update the target scale to %d", desiredReplicas)
|
setCondition(hpa, autoscalingv2.AbleToScale, corev1.ConditionTrue, "SucceededRescale", "the HPA controller was able to update the target scale to %d", desiredReplicas)
|
||||||
c.EventRecorder.Eventf(hpa, corev1.EventTypeNormal, "SuccessfulRescale", "New size: %d; reason: %s", desiredReplicas, rescaleReason)
|
c.EventRecorder.Eventf(hpa, corev1.EventTypeNormal, "SuccessfulRescale", "New size: %d; reason: %s", desiredReplicas, rescaleReason)
|
||||||
c.storeScaleEvent(hpa.Spec.Behavior, key, currentReplicas, desiredReplicas)
|
c.storeScaleEvent(hpa.Spec.Behavior, key, currentReplicas, desiredReplicas)
|
||||||
klog.Infof("Successful rescale of %s, old size: %d, new size: %d, reason: %s",
|
klog.InfoS("Successfully rescaled FederatedHPA",
|
||||||
hpa.Name, currentReplicas, desiredReplicas, rescaleReason)
|
"hpaName", hpa.Name, "currentReplicas", currentReplicas, "desiredReplicas", desiredReplicas, "rescaleReason", rescaleReason)
|
||||||
|
|
||||||
if desiredReplicas > currentReplicas {
|
if desiredReplicas > currentReplicas {
|
||||||
actionLabel = monitor.ActionLabelScaleUp
|
actionLabel = monitor.ActionLabelScaleUp
|
||||||
|
@ -391,7 +391,7 @@ func (c *FHPAController) reconcileAutoscaler(ctx context.Context, hpa *autoscali
|
||||||
actionLabel = monitor.ActionLabelScaleDown
|
actionLabel = monitor.ActionLabelScaleDown
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
klog.V(4).Infof("decided not to scale %s to %v (last scale time was %s)", reference, desiredReplicas, hpa.Status.LastScaleTime)
|
klog.V(4).InfoS("decided not to scale resource", "resource", reference, "desiredReplicas", desiredReplicas, "hpaLastScaleTime", hpa.Status.LastScaleTime)
|
||||||
desiredReplicas = currentReplicas
|
desiredReplicas = currentReplicas
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -484,19 +484,19 @@ func (c *FHPAController) scaleForTargetCluster(ctx context.Context, clusters []s
|
||||||
for _, cluster := range clusters {
|
for _, cluster := range clusters {
|
||||||
clusterClient, err := c.ClusterScaleClientSetFunc(cluster, c.Client)
|
clusterClient, err := c.ClusterScaleClientSetFunc(cluster, c.Client)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to get cluster client of cluster %s.", cluster)
|
klog.ErrorS(err, "Failed to get cluster client of cluster", "cluster", cluster)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
clusterInformerManager, err := c.buildPodInformerForCluster(clusterClient)
|
clusterInformerManager, err := c.buildPodInformerForCluster(clusterClient)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to get or create informer for cluster %s. Error: %v.", cluster, err)
|
klog.ErrorS(err, "Failed to get or create informer for cluster", "cluster", cluster)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
scale, err := clusterClient.ScaleClient.Scales(hpa.Namespace).Get(ctx, targetGR, hpa.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
|
scale, err := clusterClient.ScaleClient.Scales(hpa.Namespace).Get(ctx, targetGR, hpa.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to get scale subResource of resource %s in cluster %s.", hpa.Spec.ScaleTargetRef.Name, cluster)
|
klog.ErrorS(err, "Failed to get scale subResource of resource in cluster", "resource", hpa.Spec.ScaleTargetRef.Name, "cluster", cluster)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -523,19 +523,19 @@ func (c *FHPAController) scaleForTargetCluster(ctx context.Context, clusters []s
|
||||||
|
|
||||||
podInterface, err := clusterInformerManager.Lister(podGVR)
|
podInterface, err := clusterInformerManager.Lister(podGVR)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to get podInterface for cluster %s.", cluster)
|
klog.ErrorS(err, "Failed to get podInterface for cluster", "cluster", cluster)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
podLister, ok := podInterface.(listcorev1.PodLister)
|
podLister, ok := podInterface.(listcorev1.PodLister)
|
||||||
if !ok {
|
if !ok {
|
||||||
klog.Errorf("Failed to convert interface to PodLister for cluster %s.", cluster)
|
klog.ErrorS(nil, "Failed to convert interface to PodLister for cluster", "cluster", cluster)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
podList, err := podLister.Pods(hpa.Namespace).List(selector)
|
podList, err := podLister.Pods(hpa.Namespace).List(selector)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to get podList for cluster %s.", cluster)
|
klog.ErrorS(err, "Failed to get podList for cluster", "cluster", cluster)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -561,7 +561,7 @@ func (c *FHPAController) buildPodInformerForCluster(clusterScaleClient *util.Clu
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := singleClusterInformerManager.Lister(podGVR); err != nil {
|
if _, err := singleClusterInformerManager.Lister(podGVR); err != nil {
|
||||||
klog.Errorf("Failed to get the lister for pods: %v", err)
|
klog.ErrorS(err, "Failed to get the lister for pods")
|
||||||
}
|
}
|
||||||
|
|
||||||
c.TypedInformerManager.Start(clusterScaleClient.ClusterName)
|
c.TypedInformerManager.Start(clusterScaleClient.ClusterName)
|
||||||
|
@ -576,7 +576,7 @@ func (c *FHPAController) buildPodInformerForCluster(clusterScaleClient *util.Clu
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}(); err != nil {
|
}(); err != nil {
|
||||||
klog.Errorf("Failed to sync cache for cluster: %s, error: %v", clusterScaleClient.ClusterName, err)
|
klog.ErrorS(err, "Failed to sync cache for cluster", "cluster", clusterScaleClient.ClusterName)
|
||||||
c.TypedInformerManager.Stop(clusterScaleClient.ClusterName)
|
c.TypedInformerManager.Stop(clusterScaleClient.ClusterName)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -1377,7 +1377,7 @@ func (c *FHPAController) updateStatus(ctx context.Context, hpa *autoscalingv1alp
|
||||||
c.EventRecorder.Event(hpa, corev1.EventTypeWarning, "FailedUpdateStatus", err.Error())
|
c.EventRecorder.Event(hpa, corev1.EventTypeWarning, "FailedUpdateStatus", err.Error())
|
||||||
return fmt.Errorf("failed to update status for %s: %v", hpa.Name, err)
|
return fmt.Errorf("failed to update status for %s: %v", hpa.Name, err)
|
||||||
}
|
}
|
||||||
klog.V(2).Infof("Successfully updated status for %s", hpa.Name)
|
klog.V(2).InfoS("Successfully updated status for hpa", "hpaName", hpa.Name)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -128,7 +128,7 @@ func getPodMetrics(rawMetrics []metricsapi.PodMetrics, resource corev1.ResourceN
|
||||||
resValue, found := c.Usage[resource]
|
resValue, found := c.Usage[resource]
|
||||||
if !found {
|
if !found {
|
||||||
missing = true
|
missing = true
|
||||||
klog.V(2).Infof("missing resource metric %v for %s/%s", resource, m.Namespace, m.Name)
|
klog.V(2).InfoS("missing resource metric", "resource", resource, "namespace", m.Namespace, "name", m.Name)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
podSum += resValue.MilliValue()
|
podSum += resValue.MilliValue()
|
||||||
|
|
|
@ -18,6 +18,7 @@ package multiclusterservice
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -84,7 +85,7 @@ const EndpointSliceCollectControllerName = "endpointslice-collect-controller"
|
||||||
|
|
||||||
// Reconcile performs a full reconciliation for the object referred to by the Request.
|
// Reconcile performs a full reconciliation for the object referred to by the Request.
|
||||||
func (c *EndpointSliceCollectController) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
|
func (c *EndpointSliceCollectController) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
|
||||||
klog.V(4).Infof("Reconciling Work %s", req.NamespacedName.String())
|
klog.V(4).InfoS("Reconciling Work", "namespace", req.Namespace, "name", req.Name)
|
||||||
|
|
||||||
work := &workv1alpha1.Work{}
|
work := &workv1alpha1.Work{}
|
||||||
if err := c.Client.Get(ctx, req.NamespacedName, work); err != nil {
|
if err := c.Client.Get(ctx, req.NamespacedName, work); err != nil {
|
||||||
|
@ -105,7 +106,7 @@ func (c *EndpointSliceCollectController) Reconcile(ctx context.Context, req cont
|
||||||
|
|
||||||
clusterName, err := names.GetClusterName(work.Namespace)
|
clusterName, err := names.GetClusterName(work.Namespace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to get cluster name for work %s/%s", work.Namespace, work.Name)
|
klog.ErrorS(err, "Failed to get cluster name for work", "namespace", work.Namespace, "name", work.Name)
|
||||||
return controllerruntime.Result{}, err
|
return controllerruntime.Result{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -144,14 +145,15 @@ func (c *EndpointSliceCollectController) collectEndpointSlice(key util.QueueKey)
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
fedKey, ok := key.(keys.FederatedKey)
|
fedKey, ok := key.(keys.FederatedKey)
|
||||||
if !ok {
|
if !ok {
|
||||||
klog.Errorf("Failed to collect endpointslice as invalid key: %v", key)
|
var ErrInvalidKey = errors.New("invalid key")
|
||||||
|
klog.ErrorS(ErrInvalidKey, "Failed to collect endpointslice as invalid key", "key", key)
|
||||||
return fmt.Errorf("invalid key")
|
return fmt.Errorf("invalid key")
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.V(4).Infof("Begin to collect %s %s.", fedKey.Kind, fedKey.NamespaceKey())
|
klog.V(4).InfoS("Begin to collect", "kind", fedKey.Kind, "namespaceKey", fedKey.NamespaceKey())
|
||||||
if err := c.handleEndpointSliceEvent(ctx, fedKey); err != nil {
|
if err := c.handleEndpointSliceEvent(ctx, fedKey); err != nil {
|
||||||
klog.Errorf("Failed to handle endpointSlice(%s) event, Error: %v",
|
klog.ErrorS(err, "Failed to handle endpointSlice event", "namespaceKey",
|
||||||
fedKey.NamespaceKey(), err)
|
fedKey.NamespaceKey())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -161,17 +163,18 @@ func (c *EndpointSliceCollectController) collectEndpointSlice(key util.QueueKey)
|
||||||
func (c *EndpointSliceCollectController) buildResourceInformers(clusterName string) error {
|
func (c *EndpointSliceCollectController) buildResourceInformers(clusterName string) error {
|
||||||
cluster, err := util.GetCluster(c.Client, clusterName)
|
cluster, err := util.GetCluster(c.Client, clusterName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to get the given member cluster %s", clusterName)
|
klog.ErrorS(err, "Failed to get the given member cluster", "cluster", clusterName)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if !util.IsClusterReady(&cluster.Status) {
|
if !util.IsClusterReady(&cluster.Status) {
|
||||||
klog.Errorf("Stop collect endpointslice for cluster(%s) as cluster not ready.", cluster.Name)
|
var ErrClusterNotReady = errors.New("cluster not ready")
|
||||||
|
klog.ErrorS(ErrClusterNotReady, "Stop collect endpointslice for cluster as cluster not ready.", "cluster", cluster.Name)
|
||||||
return fmt.Errorf("cluster(%s) not ready", cluster.Name)
|
return fmt.Errorf("cluster(%s) not ready", cluster.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.registerInformersAndStart(cluster); err != nil {
|
if err := c.registerInformersAndStart(cluster); err != nil {
|
||||||
klog.Errorf("Failed to register informer for Cluster %s. Error: %v.", cluster.Name, err)
|
klog.ErrorS(err, "Failed to register informer for Cluster", "cluster", cluster.Name)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -185,7 +188,7 @@ func (c *EndpointSliceCollectController) registerInformersAndStart(cluster *clus
|
||||||
if singleClusterInformerManager == nil {
|
if singleClusterInformerManager == nil {
|
||||||
dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client, c.ClusterClientOption)
|
dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client, c.ClusterClientOption)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name)
|
klog.ErrorS(err, "Failed to build dynamic cluster client for cluster", "cluster", cluster.Name)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
singleClusterInformerManager = c.InformerManager.ForCluster(dynamicClusterClient.ClusterName, dynamicClusterClient.DynamicClientSet, 0)
|
singleClusterInformerManager = c.InformerManager.ForCluster(dynamicClusterClient.ClusterName, dynamicClusterClient.DynamicClientSet, 0)
|
||||||
|
@ -220,7 +223,7 @@ func (c *EndpointSliceCollectController) registerInformersAndStart(cluster *clus
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}(); err != nil {
|
}(); err != nil {
|
||||||
klog.Errorf("Failed to sync cache for cluster: %s, error: %v", cluster.Name, err)
|
klog.ErrorS(err, "Failed to sync cache for cluster", "cluster", cluster.Name)
|
||||||
c.InformerManager.Stop(cluster.Name)
|
c.InformerManager.Stop(cluster.Name)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -245,7 +248,7 @@ func (c *EndpointSliceCollectController) genHandlerAddFunc(clusterName string) f
|
||||||
curObj := obj.(runtime.Object)
|
curObj := obj.(runtime.Object)
|
||||||
key, err := keys.FederatedKeyFunc(clusterName, curObj)
|
key, err := keys.FederatedKeyFunc(clusterName, curObj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Warningf("Failed to generate key for obj: %s", curObj.GetObjectKind().GroupVersionKind())
|
klog.ErrorS(err, "Failed to generate key for obj", "gvk", curObj.GetObjectKind().GroupVersionKind())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
c.worker.Add(key)
|
c.worker.Add(key)
|
||||||
|
@ -258,7 +261,7 @@ func (c *EndpointSliceCollectController) genHandlerUpdateFunc(clusterName string
|
||||||
if !reflect.DeepEqual(oldObj, newObj) {
|
if !reflect.DeepEqual(oldObj, newObj) {
|
||||||
key, err := keys.FederatedKeyFunc(clusterName, curObj)
|
key, err := keys.FederatedKeyFunc(clusterName, curObj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Warningf("Failed to generate key for obj: %s", curObj.GetObjectKind().GroupVersionKind())
|
klog.ErrorS(err, "Failed to generate key for obj", "gvk", curObj.GetObjectKind().GroupVersionKind())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
c.worker.Add(key)
|
c.worker.Add(key)
|
||||||
|
@ -278,7 +281,7 @@ func (c *EndpointSliceCollectController) genHandlerDeleteFunc(clusterName string
|
||||||
oldObj := obj.(runtime.Object)
|
oldObj := obj.(runtime.Object)
|
||||||
key, err := keys.FederatedKeyFunc(clusterName, oldObj)
|
key, err := keys.FederatedKeyFunc(clusterName, oldObj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Warningf("Failed to generate key for obj: %s", oldObj.GetObjectKind().GroupVersionKind())
|
klog.ErrorS(err, "Failed to generate key for obj", "gvk", oldObj.GetObjectKind().GroupVersionKind())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
c.worker.Add(key)
|
c.worker.Add(key)
|
||||||
|
@ -308,7 +311,7 @@ func (c *EndpointSliceCollectController) handleEndpointSliceEvent(ctx context.Co
|
||||||
util.MultiClusterServiceNamespaceLabel: endpointSliceKey.Namespace,
|
util.MultiClusterServiceNamespaceLabel: endpointSliceKey.Namespace,
|
||||||
util.MultiClusterServiceNameLabel: util.GetLabelValue(endpointSliceObj.GetLabels(), discoveryv1.LabelServiceName),
|
util.MultiClusterServiceNameLabel: util.GetLabelValue(endpointSliceObj.GetLabels(), discoveryv1.LabelServiceName),
|
||||||
})}); err != nil {
|
})}); err != nil {
|
||||||
klog.Errorf("Failed to list workList reported by endpointSlice(%s/%s), error: %v", endpointSliceKey.Namespace, endpointSliceKey.Name, err)
|
klog.ErrorS(err, "Failed to list workList reported by endpointSlice", "namespace", endpointSliceKey.Namespace, "name", endpointSliceKey.Name)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -324,8 +327,8 @@ func (c *EndpointSliceCollectController) handleEndpointSliceEvent(ctx context.Co
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = c.reportEndpointSliceWithEndpointSliceCreateOrUpdate(ctx, endpointSliceKey.Cluster, endpointSliceObj); err != nil {
|
if err = c.reportEndpointSliceWithEndpointSliceCreateOrUpdate(ctx, endpointSliceKey.Cluster, endpointSliceObj); err != nil {
|
||||||
klog.Errorf("Failed to handle endpointSlice(%s) event, Error: %v",
|
klog.ErrorS(err, "Failed to handle endpointSlice event", "namespaceKey",
|
||||||
endpointSliceKey.NamespaceKey(), err)
|
endpointSliceKey.NamespaceKey())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -336,7 +339,7 @@ func (c *EndpointSliceCollectController) collectTargetEndpointSlice(ctx context.
|
||||||
manager := c.InformerManager.GetSingleClusterManager(clusterName)
|
manager := c.InformerManager.GetSingleClusterManager(clusterName)
|
||||||
if manager == nil {
|
if manager == nil {
|
||||||
err := fmt.Errorf("failed to get informer manager for cluster %s", clusterName)
|
err := fmt.Errorf("failed to get informer manager for cluster %s", clusterName)
|
||||||
klog.Errorf("%v", err)
|
klog.ErrorS(err, "Failed to get informer manager for cluster")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -347,13 +350,13 @@ func (c *EndpointSliceCollectController) collectTargetEndpointSlice(ctx context.
|
||||||
})
|
})
|
||||||
epsList, err := manager.Lister(discoveryv1.SchemeGroupVersion.WithResource("endpointslices")).ByNamespace(svcNamespace).List(selector)
|
epsList, err := manager.Lister(discoveryv1.SchemeGroupVersion.WithResource("endpointslices")).ByNamespace(svcNamespace).List(selector)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to list EndpointSlice for Service(%s/%s) in cluster(%s), Error: %v", svcNamespace, svcName, clusterName, err)
|
klog.ErrorS(err, "Failed to list EndpointSlice for Service in a cluster", "namespace", svcNamespace, "name", svcName, "cluster", clusterName)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, epsObj := range epsList {
|
for _, epsObj := range epsList {
|
||||||
eps := &discoveryv1.EndpointSlice{}
|
eps := &discoveryv1.EndpointSlice{}
|
||||||
if err = helper.ConvertToTypedObject(epsObj, eps); err != nil {
|
if err = helper.ConvertToTypedObject(epsObj, eps); err != nil {
|
||||||
klog.Errorf("Failed to convert object to EndpointSlice, error: %v", err)
|
klog.ErrorS(err, "Failed to convert object to EndpointSlice")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if util.GetLabelValue(eps.GetLabels(), discoveryv1.LabelManagedBy) == util.EndpointSliceDispatchControllerLabelValue {
|
if util.GetLabelValue(eps.GetLabels(), discoveryv1.LabelManagedBy) == util.EndpointSliceDispatchControllerLabelValue {
|
||||||
|
@ -361,7 +364,7 @@ func (c *EndpointSliceCollectController) collectTargetEndpointSlice(ctx context.
|
||||||
}
|
}
|
||||||
epsUnstructured, err := helper.ToUnstructured(eps)
|
epsUnstructured, err := helper.ToUnstructured(eps)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to convert EndpointSlice %s/%s to unstructured, error: %v", eps.GetNamespace(), eps.GetName(), err)
|
klog.ErrorS(err, "Failed to convert EndpointSlice to unstructured", "namespace", eps.GetNamespace(), "name", eps.GetName())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err = c.reportEndpointSliceWithEndpointSliceCreateOrUpdate(ctx, clusterName, epsUnstructured); err != nil {
|
if err = c.reportEndpointSliceWithEndpointSliceCreateOrUpdate(ctx, clusterName, epsUnstructured); err != nil {
|
||||||
|
@ -394,7 +397,7 @@ func reportEndpointSlice(ctx context.Context, c client.Client, endpointSlice *un
|
||||||
|
|
||||||
// indicate the Work should be not propagated since it's collected resource.
|
// indicate the Work should be not propagated since it's collected resource.
|
||||||
if err := ctrlutil.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice, ctrlutil.WithSuspendDispatching(true)); err != nil {
|
if err := ctrlutil.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice, ctrlutil.WithSuspendDispatching(true)); err != nil {
|
||||||
klog.Errorf("Failed to create or update work(%s/%s), Error: %v", workMeta.Namespace, workMeta.Name, err)
|
klog.ErrorS(err, "Failed to create or update work", "namespace", workMeta.Namespace, "name", workMeta.Name)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -408,7 +411,7 @@ func getEndpointSliceWorkMeta(ctx context.Context, c client.Client, ns string, w
|
||||||
Namespace: ns,
|
Namespace: ns,
|
||||||
Name: workName,
|
Name: workName,
|
||||||
}, existWork); err != nil && !apierrors.IsNotFound(err) {
|
}, existWork); err != nil && !apierrors.IsNotFound(err) {
|
||||||
klog.Errorf("Get EndpointSlice work(%s/%s) error:%v", ns, workName, err)
|
klog.ErrorS(err, "Get EndpointSlice work", "namespace", ns, "name", workName)
|
||||||
return metav1.ObjectMeta{}, err
|
return metav1.ObjectMeta{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -449,7 +452,7 @@ func cleanupWorkWithEndpointSliceDelete(ctx context.Context, c client.Client, en
|
||||||
if apierrors.IsNotFound(err) {
|
if apierrors.IsNotFound(err) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
klog.Errorf("Failed to get work(%s) in executionSpace(%s): %v", workNamespaceKey.String(), executionSpace, err)
|
klog.ErrorS(err, "Failed to get work in executionSpace", "namespaceKey", workNamespaceKey.String(), "executionSpace", executionSpace)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -472,14 +475,14 @@ func cleanProviderClustersEndpointSliceWork(ctx context.Context, c client.Client
|
||||||
work.Labels[util.EndpointSliceWorkManagedByLabel] = strings.Join(controllerSet.UnsortedList(), ".")
|
work.Labels[util.EndpointSliceWorkManagedByLabel] = strings.Join(controllerSet.UnsortedList(), ".")
|
||||||
|
|
||||||
if err := c.Update(ctx, work); err != nil {
|
if err := c.Update(ctx, work); err != nil {
|
||||||
klog.Errorf("Failed to update work(%s/%s): %v", work.Namespace, work.Name, err)
|
klog.ErrorS(err, "Failed to update work", "namespace", work.Namespace, "name", work.Name)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.Delete(ctx, work); err != nil {
|
if err := c.Delete(ctx, work); err != nil {
|
||||||
klog.Errorf("Failed to delete work(%s/%s): %v", work.Namespace, work.Name, err)
|
klog.ErrorS(err, "Failed to delete work", "namespace", work.Namespace, "name", work.Name)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -66,7 +66,7 @@ type EndpointsliceDispatchController struct {
|
||||||
|
|
||||||
// Reconcile performs a full reconciliation for the object referred to by the Request.
|
// Reconcile performs a full reconciliation for the object referred to by the Request.
|
||||||
func (c *EndpointsliceDispatchController) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
|
func (c *EndpointsliceDispatchController) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
|
||||||
klog.V(4).Infof("Reconciling Work %s", req.NamespacedName.String())
|
klog.V(4).InfoS("Reconciling Work", "namespacedName", req.NamespacedName.String())
|
||||||
|
|
||||||
work := &workv1alpha1.Work{}
|
work := &workv1alpha1.Work{}
|
||||||
if err := c.Client.Get(ctx, req.NamespacedName, work); err != nil {
|
if err := c.Client.Get(ctx, req.NamespacedName, work); err != nil {
|
||||||
|
@ -83,7 +83,7 @@ func (c *EndpointsliceDispatchController) Reconcile(ctx context.Context, req con
|
||||||
mcsName := util.GetLabelValue(work.Labels, util.MultiClusterServiceNameLabel)
|
mcsName := util.GetLabelValue(work.Labels, util.MultiClusterServiceNameLabel)
|
||||||
if !work.DeletionTimestamp.IsZero() || mcsName == "" {
|
if !work.DeletionTimestamp.IsZero() || mcsName == "" {
|
||||||
if err := c.cleanupEndpointSliceFromConsumerClusters(ctx, work); err != nil {
|
if err := c.cleanupEndpointSliceFromConsumerClusters(ctx, work); err != nil {
|
||||||
klog.Errorf("Failed to cleanup EndpointSlice from consumer clusters for work %s/%s:%v", work.Namespace, work.Name, err)
|
klog.ErrorS(err, "Failed to cleanup EndpointSlice from consumer clusters for work", "namespace", work.Namespace, "name", work.Name)
|
||||||
return controllerruntime.Result{}, err
|
return controllerruntime.Result{}, err
|
||||||
}
|
}
|
||||||
return controllerruntime.Result{}, nil
|
return controllerruntime.Result{}, nil
|
||||||
|
@ -93,7 +93,7 @@ func (c *EndpointsliceDispatchController) Reconcile(ctx context.Context, req con
|
||||||
mcs := &networkingv1alpha1.MultiClusterService{}
|
mcs := &networkingv1alpha1.MultiClusterService{}
|
||||||
if err := c.Client.Get(ctx, types.NamespacedName{Namespace: mcsNS, Name: mcsName}, mcs); err != nil {
|
if err := c.Client.Get(ctx, types.NamespacedName{Namespace: mcsNS, Name: mcsName}, mcs); err != nil {
|
||||||
if apierrors.IsNotFound(err) {
|
if apierrors.IsNotFound(err) {
|
||||||
klog.Warningf("MultiClusterService %s/%s is not found", mcsNS, mcsName)
|
klog.ErrorS(err, "MultiClusterService is not found", "namespace", mcsNS, "name", mcsName)
|
||||||
return controllerruntime.Result{}, nil
|
return controllerruntime.Result{}, nil
|
||||||
}
|
}
|
||||||
return controllerruntime.Result{}, err
|
return controllerruntime.Result{}, err
|
||||||
|
@ -185,7 +185,7 @@ func (c *EndpointsliceDispatchController) newClusterFunc() handler.MapFunc {
|
||||||
|
|
||||||
mcsList := &networkingv1alpha1.MultiClusterServiceList{}
|
mcsList := &networkingv1alpha1.MultiClusterServiceList{}
|
||||||
if err := c.Client.List(ctx, mcsList, &client.ListOptions{}); err != nil {
|
if err := c.Client.List(ctx, mcsList, &client.ListOptions{}); err != nil {
|
||||||
klog.Errorf("Failed to list MultiClusterService, error: %v", err)
|
klog.ErrorS(err, "Failed to list MultiClusterService")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -193,7 +193,7 @@ func (c *EndpointsliceDispatchController) newClusterFunc() handler.MapFunc {
|
||||||
for _, mcs := range mcsList.Items {
|
for _, mcs := range mcsList.Items {
|
||||||
clusterSet, err := helper.GetConsumerClusters(c.Client, mcs.DeepCopy())
|
clusterSet, err := helper.GetConsumerClusters(c.Client, mcs.DeepCopy())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to get provider clusters, error: %v", err)
|
klog.ErrorS(err, "Failed to get provider clusters")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -203,7 +203,7 @@ func (c *EndpointsliceDispatchController) newClusterFunc() handler.MapFunc {
|
||||||
|
|
||||||
workList, err := c.getClusterEndpointSliceWorks(ctx, mcs.Namespace, mcs.Name)
|
workList, err := c.getClusterEndpointSliceWorks(ctx, mcs.Namespace, mcs.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to list work, error: %v", err)
|
klog.ErrorS(err, "Failed to list work")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for _, work := range workList {
|
for _, work := range workList {
|
||||||
|
@ -229,7 +229,7 @@ func (c *EndpointsliceDispatchController) getClusterEndpointSliceWorks(ctx conte
|
||||||
util.MultiClusterServiceNamespaceLabel: mcsNamespace,
|
util.MultiClusterServiceNamespaceLabel: mcsNamespace,
|
||||||
}),
|
}),
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
klog.Errorf("Failed to list work, error: %v", err)
|
klog.ErrorS(err, "Failed to list work")
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -249,7 +249,7 @@ func (c *EndpointsliceDispatchController) newMultiClusterServiceFunc() handler.M
|
||||||
|
|
||||||
workList, err := c.getClusterEndpointSliceWorks(ctx, mcsNamespace, mcsName)
|
workList, err := c.getClusterEndpointSliceWorks(ctx, mcsNamespace, mcsName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to list work, error: %v", err)
|
klog.ErrorS(err, "Failed to list work")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -273,7 +273,7 @@ func (c *EndpointsliceDispatchController) cleanOrphanDispatchedEndpointSlice(ctx
|
||||||
util.MultiClusterServiceNameLabel: mcs.Name,
|
util.MultiClusterServiceNameLabel: mcs.Name,
|
||||||
util.MultiClusterServiceNamespaceLabel: mcs.Namespace,
|
util.MultiClusterServiceNamespaceLabel: mcs.Namespace,
|
||||||
})}); err != nil {
|
})}); err != nil {
|
||||||
klog.Errorf("Failed to list works, error is: %v", err)
|
klog.ErrorS(err, "Failed to list works")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -285,13 +285,13 @@ func (c *EndpointsliceDispatchController) cleanOrphanDispatchedEndpointSlice(ctx
|
||||||
|
|
||||||
consumerClusters, err := helper.GetConsumerClusters(c.Client, mcs)
|
consumerClusters, err := helper.GetConsumerClusters(c.Client, mcs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to get consumer clusters, error is: %v", err)
|
klog.ErrorS(err, "Failed to get consumer clusters")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
cluster, err := names.GetClusterName(work.Namespace)
|
cluster, err := names.GetClusterName(work.Namespace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to get cluster name for work %s/%s", work.Namespace, work.Name)
|
klog.ErrorS(err, "Failed to get cluster name for work", "namespace", work.Namespace, "name", work.Name)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -300,7 +300,7 @@ func (c *EndpointsliceDispatchController) cleanOrphanDispatchedEndpointSlice(ctx
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = c.Client.Delete(ctx, work.DeepCopy()); err != nil {
|
if err = c.Client.Delete(ctx, work.DeepCopy()); err != nil {
|
||||||
klog.Errorf("Failed to delete work %s/%s, error is: %v", work.Namespace, work.Name, err)
|
klog.ErrorS(err, "Failed to delete work", "namespace", work.Namespace, "name", work.Name)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -311,13 +311,13 @@ func (c *EndpointsliceDispatchController) cleanOrphanDispatchedEndpointSlice(ctx
|
||||||
func (c *EndpointsliceDispatchController) dispatchEndpointSlice(ctx context.Context, work *workv1alpha1.Work, mcs *networkingv1alpha1.MultiClusterService) error {
|
func (c *EndpointsliceDispatchController) dispatchEndpointSlice(ctx context.Context, work *workv1alpha1.Work, mcs *networkingv1alpha1.MultiClusterService) error {
|
||||||
epsSourceCluster, err := names.GetClusterName(work.Namespace)
|
epsSourceCluster, err := names.GetClusterName(work.Namespace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to get EndpointSlice source cluster name for work %s/%s", work.Namespace, work.Name)
|
klog.ErrorS(err, "Failed to get EndpointSlice source cluster name for work", "namespace", work.Namespace, "name", work.Name)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
consumerClusters, err := helper.GetConsumerClusters(c.Client, mcs)
|
consumerClusters, err := helper.GetConsumerClusters(c.Client, mcs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to get consumer clusters, error is: %v", err)
|
klog.ErrorS(err, "Failed to get consumer clusters")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for clusterName := range consumerClusters {
|
for clusterName := range consumerClusters {
|
||||||
|
@ -330,7 +330,7 @@ func (c *EndpointsliceDispatchController) dispatchEndpointSlice(ctx context.Cont
|
||||||
c.EventRecorder.Eventf(mcs, corev1.EventTypeWarning, events.EventReasonClusterNotFound, "Consumer cluster %s is not found", clusterName)
|
c.EventRecorder.Eventf(mcs, corev1.EventTypeWarning, events.EventReasonClusterNotFound, "Consumer cluster %s is not found", clusterName)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
klog.Errorf("Failed to get cluster %s, error is: %v", clusterName, err)
|
klog.ErrorS(err, "Failed to get cluster", "cluster", clusterName)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if !util.IsClusterReady(&clusterObj.Status) {
|
if !util.IsClusterReady(&clusterObj.Status) {
|
||||||
|
@ -361,13 +361,13 @@ func (c *EndpointsliceDispatchController) ensureEndpointSliceWork(ctx context.Co
|
||||||
manifest := work.Spec.Workload.Manifests[0]
|
manifest := work.Spec.Workload.Manifests[0]
|
||||||
unstructuredObj := &unstructured.Unstructured{}
|
unstructuredObj := &unstructured.Unstructured{}
|
||||||
if err := unstructuredObj.UnmarshalJSON(manifest.Raw); err != nil {
|
if err := unstructuredObj.UnmarshalJSON(manifest.Raw); err != nil {
|
||||||
klog.Errorf("Failed to unmarshal work manifest, error is: %v", err)
|
klog.ErrorS(err, "Failed to unmarshal work manifest")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
endpointSlice := &discoveryv1.EndpointSlice{}
|
endpointSlice := &discoveryv1.EndpointSlice{}
|
||||||
if err := helper.ConvertToTypedObject(unstructuredObj, endpointSlice); err != nil {
|
if err := helper.ConvertToTypedObject(unstructuredObj, endpointSlice); err != nil {
|
||||||
klog.Errorf("Failed to convert unstructured object to typed object, error is: %v", err)
|
klog.ErrorS(err, "Failed to convert unstructured object to typed object")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -397,12 +397,12 @@ func (c *EndpointsliceDispatchController) ensureEndpointSliceWork(ctx context.Co
|
||||||
}
|
}
|
||||||
unstructuredEPS, err := helper.ToUnstructured(endpointSlice)
|
unstructuredEPS, err := helper.ToUnstructured(endpointSlice)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to convert typed object to unstructured object, error is: %v", err)
|
klog.ErrorS(err, "Failed to convert typed object to unstructured object")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := ctrlutil.CreateOrUpdateWork(ctx, c.Client, workMeta, unstructuredEPS); err != nil {
|
if err := ctrlutil.CreateOrUpdateWork(ctx, c.Client, workMeta, unstructuredEPS); err != nil {
|
||||||
klog.Errorf("Failed to dispatch EndpointSlice %s/%s from %s to cluster %s:%v",
|
klog.ErrorS(err, "Failed to dispatch EndpointSlice",
|
||||||
work.GetNamespace(), work.GetName(), providerCluster, consumerCluster, err)
|
"namespace", work.GetNamespace(), "name", work.GetName(), "providerCluster", providerCluster, "consumerCluster", consumerCluster)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -414,13 +414,13 @@ func (c *EndpointsliceDispatchController) cleanupEndpointSliceFromConsumerCluste
|
||||||
workList := &workv1alpha1.WorkList{}
|
workList := &workv1alpha1.WorkList{}
|
||||||
err := c.Client.List(ctx, workList)
|
err := c.Client.List(ctx, workList)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to list works serror: %v", err)
|
klog.ErrorS(err, "Failed to list works")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
epsSourceCluster, err := names.GetClusterName(work.Namespace)
|
epsSourceCluster, err := names.GetClusterName(work.Namespace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to get EndpointSlice provider cluster name for work %s/%s", work.Namespace, work.Name)
|
klog.ErrorS(err, "Failed to get EndpointSlice provider cluster name for work", "namespace", work.Namespace, "name", work.Name)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, item := range workList.Items {
|
for _, item := range workList.Items {
|
||||||
|
@ -434,7 +434,7 @@ func (c *EndpointsliceDispatchController) cleanupEndpointSliceFromConsumerCluste
|
||||||
|
|
||||||
if controllerutil.RemoveFinalizer(work, util.MCSEndpointSliceDispatchControllerFinalizer) {
|
if controllerutil.RemoveFinalizer(work, util.MCSEndpointSliceDispatchControllerFinalizer) {
|
||||||
if err := c.Client.Update(ctx, work); err != nil {
|
if err := c.Client.Update(ctx, work); err != nil {
|
||||||
klog.Errorf("Failed to remove %s finalizer for work %s/%s:%v", util.MCSEndpointSliceDispatchControllerFinalizer, work.Namespace, work.Name, err)
|
klog.ErrorS(err, "Failed to remove finalizer for work", "finalizer", util.MCSEndpointSliceDispatchControllerFinalizer, "namespace", work.Namespace, "name", work.Name)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -69,7 +69,7 @@ type MCSController struct {
|
||||||
// The Controller will requeue the Request to be processed again if an error is non-nil or
|
// The Controller will requeue the Request to be processed again if an error is non-nil or
|
||||||
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
|
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
|
||||||
func (c *MCSController) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
|
func (c *MCSController) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
|
||||||
klog.V(4).Infof("Reconciling MultiClusterService(%s/%s)", req.Namespace, req.Name)
|
klog.V(4).InfoS("Reconciling MultiClusterService", "namespace", req.Namespace, "name", req.Name)
|
||||||
|
|
||||||
mcs := &networkingv1alpha1.MultiClusterService{}
|
mcs := &networkingv1alpha1.MultiClusterService{}
|
||||||
if err := c.Client.Get(ctx, req.NamespacedName, mcs); err != nil {
|
if err := c.Client.Get(ctx, req.NamespacedName, mcs); err != nil {
|
||||||
|
@ -77,7 +77,7 @@ func (c *MCSController) Reconcile(ctx context.Context, req controllerruntime.Req
|
||||||
// The mcs no longer exist, in which case we stop processing.
|
// The mcs no longer exist, in which case we stop processing.
|
||||||
return controllerruntime.Result{}, nil
|
return controllerruntime.Result{}, nil
|
||||||
}
|
}
|
||||||
klog.Errorf("Failed to get MultiClusterService object(%s):%v", req.NamespacedName, err)
|
klog.ErrorS(err, "Failed to get MultiClusterService object", "namespacedName", req.NamespacedName)
|
||||||
return controllerruntime.Result{}, err
|
return controllerruntime.Result{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -103,7 +103,7 @@ func (c *MCSController) Reconcile(ctx context.Context, req controllerruntime.Req
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *MCSController) handleMultiClusterServiceDelete(ctx context.Context, mcs *networkingv1alpha1.MultiClusterService) (controllerruntime.Result, error) {
|
func (c *MCSController) handleMultiClusterServiceDelete(ctx context.Context, mcs *networkingv1alpha1.MultiClusterService) (controllerruntime.Result, error) {
|
||||||
klog.V(4).Infof("Begin to handle MultiClusterService(%s/%s) delete event", mcs.Namespace, mcs.Name)
|
klog.V(4).InfoS("Begin to handle MultiClusterService delete event", "namespace", mcs.Namespace, "name", mcs.Name)
|
||||||
|
|
||||||
if err := c.retrieveService(ctx, mcs); err != nil {
|
if err := c.retrieveService(ctx, mcs); err != nil {
|
||||||
c.EventRecorder.Event(mcs, corev1.EventTypeWarning, events.EventReasonSyncServiceFailed,
|
c.EventRecorder.Event(mcs, corev1.EventTypeWarning, events.EventReasonSyncServiceFailed,
|
||||||
|
@ -120,12 +120,12 @@ func (c *MCSController) handleMultiClusterServiceDelete(ctx context.Context, mcs
|
||||||
if controllerutil.RemoveFinalizer(mcs, util.MCSControllerFinalizer) {
|
if controllerutil.RemoveFinalizer(mcs, util.MCSControllerFinalizer) {
|
||||||
err := c.Client.Update(ctx, mcs)
|
err := c.Client.Update(ctx, mcs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to update MultiClusterService(%s/%s) with finalizer:%v", mcs.Namespace, mcs.Name, err)
|
klog.ErrorS(err, "Failed to update MultiClusterService with finalizer", "namespace", mcs.Namespace, "name", mcs.Name)
|
||||||
return controllerruntime.Result{}, err
|
return controllerruntime.Result{}, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.V(4).Infof("Success to delete MultiClusterService(%s/%s)", mcs.Namespace, mcs.Name)
|
klog.V(4).InfoS("Success to delete MultiClusterService", "namespace", mcs.Namespace, "name", mcs.Name)
|
||||||
return controllerruntime.Result{}, nil
|
return controllerruntime.Result{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -135,7 +135,7 @@ func (c *MCSController) retrieveMultiClusterService(ctx context.Context, mcs *ne
|
||||||
networkingv1alpha1.MultiClusterServicePermanentIDLabel: mcsID,
|
networkingv1alpha1.MultiClusterServicePermanentIDLabel: mcsID,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to list work by MultiClusterService(%s/%s): %v", mcs.Namespace, mcs.Name, err)
|
klog.ErrorS(err, "Failed to list work by MultiClusterService", "namespace", mcs.Namespace, "name", mcs.Name)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -145,7 +145,7 @@ func (c *MCSController) retrieveMultiClusterService(ctx context.Context, mcs *ne
|
||||||
}
|
}
|
||||||
clusterName, err := names.GetClusterName(work.Namespace)
|
clusterName, err := names.GetClusterName(work.Namespace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to get member cluster name for work %s/%s:%v", work.Namespace, work.Name, work)
|
klog.ErrorS(err, "Failed to get member cluster name for work", "namespace", work.Namespace, "name", work.Name)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -154,17 +154,17 @@ func (c *MCSController) retrieveMultiClusterService(ctx context.Context, mcs *ne
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = c.cleanProviderEndpointSliceWork(ctx, work.DeepCopy()); err != nil {
|
if err = c.cleanProviderEndpointSliceWork(ctx, work.DeepCopy()); err != nil {
|
||||||
klog.Errorf("Failed to clean provider EndpointSlice work(%s/%s):%v", work.Namespace, work.Name, err)
|
klog.ErrorS(err, "Failed to clean provider EndpointSlice work", "namespace", work.Namespace, "name", work.Name)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = c.Client.Delete(ctx, work.DeepCopy()); err != nil && !apierrors.IsNotFound(err) {
|
if err = c.Client.Delete(ctx, work.DeepCopy()); err != nil && !apierrors.IsNotFound(err) {
|
||||||
klog.Errorf("Error while deleting work(%s/%s): %v", work.Namespace, work.Name, err)
|
klog.ErrorS(err, "Error while deleting work", "namespace", work.Namespace, "name", work.Name)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.V(4).Infof("Success to clean up MultiClusterService(%s/%s) work: %v", mcs.Namespace, mcs.Name, err)
|
klog.V(4).InfoS("Success to clean up MultiClusterService", "namespace", mcs.Namespace, "name", mcs.Name)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -177,7 +177,7 @@ func (c *MCSController) cleanProviderEndpointSliceWork(ctx context.Context, work
|
||||||
util.MultiClusterServiceNamespaceLabel: util.GetLabelValue(work.Labels, util.MultiClusterServiceNamespaceLabel),
|
util.MultiClusterServiceNamespaceLabel: util.GetLabelValue(work.Labels, util.MultiClusterServiceNamespaceLabel),
|
||||||
}),
|
}),
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
klog.Errorf("Failed to list workList reported by work(MultiClusterService)(%s/%s): %v", work.Namespace, work.Name, err)
|
klog.ErrorS(err, "Failed to list workList reported by work(MultiClusterService)", "namespace", work.Namespace, "name", work.Name)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -204,16 +204,16 @@ func (c *MCSController) cleanProviderEndpointSliceWork(ctx context.Context, work
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *MCSController) handleMultiClusterServiceCreateOrUpdate(ctx context.Context, mcs *networkingv1alpha1.MultiClusterService) error {
|
func (c *MCSController) handleMultiClusterServiceCreateOrUpdate(ctx context.Context, mcs *networkingv1alpha1.MultiClusterService) error {
|
||||||
klog.V(4).Infof("Begin to handle MultiClusterService(%s/%s) create or update event", mcs.Namespace, mcs.Name)
|
klog.V(4).InfoS("Begin to handle MultiClusterService create or update event", "namespace", mcs.Namespace, "name", mcs.Name)
|
||||||
|
|
||||||
providerClusters, err := helper.GetProviderClusters(c.Client, mcs)
|
providerClusters, err := helper.GetProviderClusters(c.Client, mcs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to get provider clusters by MultiClusterService(%s/%s):%v", mcs.Namespace, mcs.Name, err)
|
klog.ErrorS(err, "Failed to get provider clusters by MultiClusterService", "namespace", mcs.Namespace, "name", mcs.Name)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
consumerClusters, err := helper.GetConsumerClusters(c.Client, mcs)
|
consumerClusters, err := helper.GetConsumerClusters(c.Client, mcs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to get consumer clusters by MultiClusterService(%s/%s):%v", mcs.Namespace, mcs.Name, err)
|
klog.ErrorS(err, "Failed to get consumer clusters by MultiClusterService", "namespace", mcs.Namespace, "name", mcs.Name)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -228,7 +228,7 @@ func (c *MCSController) handleMultiClusterServiceCreateOrUpdate(ctx context.Cont
|
||||||
if controllerutil.RemoveFinalizer(mcs, util.MCSControllerFinalizer) {
|
if controllerutil.RemoveFinalizer(mcs, util.MCSControllerFinalizer) {
|
||||||
err := c.Client.Update(ctx, mcs)
|
err := c.Client.Update(ctx, mcs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to remove finalizer(%s) from MultiClusterService(%s/%s):%v", util.MCSControllerFinalizer, mcs.Namespace, mcs.Name, err)
|
klog.ErrorS(err, "Failed to remove finalizer from MultiClusterService", "finalizer", util.MCSControllerFinalizer, "namespace", mcs.Namespace, "name", mcs.Name)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -239,7 +239,7 @@ func (c *MCSController) handleMultiClusterServiceCreateOrUpdate(ctx context.Cont
|
||||||
if controllerutil.AddFinalizer(mcs, util.MCSControllerFinalizer) {
|
if controllerutil.AddFinalizer(mcs, util.MCSControllerFinalizer) {
|
||||||
err = c.Client.Update(ctx, mcs)
|
err = c.Client.Update(ctx, mcs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to add finalizer(%s) to MultiClusterService(%s/%s): %v ", util.MCSControllerFinalizer, mcs.Namespace, mcs.Name, err)
|
klog.ErrorS(err, "Failed to add finalizer to MultiClusterService", "finalizer", util.MCSControllerFinalizer, "namespace", mcs.Namespace, "name", mcs.Name)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -259,7 +259,7 @@ func (c *MCSController) handleMultiClusterServiceCreateOrUpdate(ctx context.Cont
|
||||||
err = c.Client.Get(ctx, types.NamespacedName{Namespace: mcs.Namespace, Name: mcs.Name}, svc)
|
err = c.Client.Get(ctx, types.NamespacedName{Namespace: mcs.Namespace, Name: mcs.Name}, svc)
|
||||||
// If the Service is deleted, the Service's ResourceBinding will be cleaned by GC
|
// If the Service is deleted, the Service's ResourceBinding will be cleaned by GC
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to get service(%s/%s):%v", mcs.Namespace, mcs.Name, err)
|
klog.ErrorS(err, "Failed to get service", "namespace", mcs.Namespace, "name", mcs.Name)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -268,7 +268,7 @@ func (c *MCSController) handleMultiClusterServiceCreateOrUpdate(ctx context.Cont
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.V(4).Infof("Success to reconcile MultiClusterService(%s/%s)", mcs.Namespace, mcs.Name)
|
klog.V(4).InfoS("Success to reconcile MultiClusterService", "namespace", mcs.Namespace, "name", mcs.Name)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -280,7 +280,7 @@ func (c *MCSController) propagateMultiClusterService(ctx context.Context, mcs *n
|
||||||
c.EventRecorder.Eventf(mcs, corev1.EventTypeWarning, events.EventReasonClusterNotFound, "Provider cluster %s is not found", clusterName)
|
c.EventRecorder.Eventf(mcs, corev1.EventTypeWarning, events.EventReasonClusterNotFound, "Provider cluster %s is not found", clusterName)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
klog.Errorf("Failed to get cluster %s, error is: %v", clusterName, err)
|
klog.ErrorS(err, "Failed to get cluster", "cluster", clusterName)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if !util.IsClusterReady(&clusterObj.Status) {
|
if !util.IsClusterReady(&clusterObj.Status) {
|
||||||
|
@ -306,12 +306,12 @@ func (c *MCSController) propagateMultiClusterService(ctx context.Context, mcs *n
|
||||||
|
|
||||||
mcsObj, err := helper.ToUnstructured(mcs)
|
mcsObj, err := helper.ToUnstructured(mcs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to convert MultiClusterService(%s/%s) to unstructured object, err is %v", mcs.Namespace, mcs.Name, err)
|
klog.ErrorS(err, "Failed to convert MultiClusterService to unstructured object", "namespace", mcs.Namespace, "name", mcs.Name)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err = ctrlutil.CreateOrUpdateWork(ctx, c, workMeta, mcsObj, ctrlutil.WithSuspendDispatching(true)); err != nil {
|
if err = ctrlutil.CreateOrUpdateWork(ctx, c, workMeta, mcsObj, ctrlutil.WithSuspendDispatching(true)); err != nil {
|
||||||
klog.Errorf("Failed to create or update MultiClusterService(%s/%s) work in the given member cluster %s, err is %v",
|
klog.ErrorS(err, "Failed to create or update MultiClusterService work in the given member cluster",
|
||||||
mcs.Namespace, mcs.Name, clusterName, err)
|
"namespace", mcs.Namespace, "name", mcs.Name, "cluster", clusterName)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -323,7 +323,7 @@ func (c *MCSController) retrieveService(ctx context.Context, mcs *networkingv1al
|
||||||
svc := &corev1.Service{}
|
svc := &corev1.Service{}
|
||||||
err := c.Client.Get(ctx, types.NamespacedName{Namespace: mcs.Namespace, Name: mcs.Name}, svc)
|
err := c.Client.Get(ctx, types.NamespacedName{Namespace: mcs.Namespace, Name: mcs.Name}, svc)
|
||||||
if err != nil && !apierrors.IsNotFound(err) {
|
if err != nil && !apierrors.IsNotFound(err) {
|
||||||
klog.Errorf("Failed to get service(%s/%s):%v", mcs.Namespace, mcs.Name, err)
|
klog.ErrorS(err, "Failed to get service", "namespace", mcs.Namespace, "name", mcs.Name)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -338,7 +338,7 @@ func (c *MCSController) retrieveService(ctx context.Context, mcs *networkingv1al
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = c.Client.Update(ctx, svcCopy); err != nil {
|
if err = c.Client.Update(ctx, svcCopy); err != nil {
|
||||||
klog.Errorf("Failed to update service(%s/%s):%v", mcs.Namespace, mcs.Name, err)
|
klog.ErrorS(err, "Failed to update service", "namespace", mcs.Namespace, "name", mcs.Name)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -348,7 +348,7 @@ func (c *MCSController) retrieveService(ctx context.Context, mcs *networkingv1al
|
||||||
if apierrors.IsNotFound(err) {
|
if apierrors.IsNotFound(err) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
klog.Errorf("Failed to get ResourceBinding(%s/%s):%v", mcs.Namespace, names.GenerateBindingName(svc.Kind, svc.Name), err)
|
klog.ErrorS(err, "Failed to get ResourceBinding", "namespace", mcs.Namespace, "name", names.GenerateBindingName(svc.Kind, svc.Name))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -364,7 +364,7 @@ func (c *MCSController) retrieveService(ctx context.Context, mcs *networkingv1al
|
||||||
delete(rbCopy.Labels, networkingv1alpha1.MultiClusterServicePermanentIDLabel)
|
delete(rbCopy.Labels, networkingv1alpha1.MultiClusterServicePermanentIDLabel)
|
||||||
}
|
}
|
||||||
if err := c.Client.Update(ctx, rbCopy); err != nil {
|
if err := c.Client.Update(ctx, rbCopy); err != nil {
|
||||||
klog.Errorf("Failed to update ResourceBinding(%s/%s):%v", mcs.Namespace, names.GenerateBindingName(svc.Kind, svc.Name), err)
|
klog.ErrorS(err, "Failed to update ResourceBinding", "namespace", mcs.Namespace, "name", names.GenerateBindingName(svc.Kind, svc.Name))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -374,13 +374,13 @@ func (c *MCSController) retrieveService(ctx context.Context, mcs *networkingv1al
|
||||||
func (c *MCSController) propagateService(ctx context.Context, mcs *networkingv1alpha1.MultiClusterService, svc *corev1.Service,
|
func (c *MCSController) propagateService(ctx context.Context, mcs *networkingv1alpha1.MultiClusterService, svc *corev1.Service,
|
||||||
providerClusters, consumerClusters sets.Set[string]) error {
|
providerClusters, consumerClusters sets.Set[string]) error {
|
||||||
if err := c.claimMultiClusterServiceForService(ctx, svc, mcs); err != nil {
|
if err := c.claimMultiClusterServiceForService(ctx, svc, mcs); err != nil {
|
||||||
klog.Errorf("Failed to claim for Service(%s/%s), err is %v", svc.Namespace, svc.Name, err)
|
klog.ErrorS(err, "Failed to claim for Service", "namespace", svc.Namespace, "name", svc.Name)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
binding, err := c.buildResourceBinding(svc, mcs, providerClusters, consumerClusters)
|
binding, err := c.buildResourceBinding(svc, mcs, providerClusters, consumerClusters)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to build ResourceBinding for Service(%s/%s), err is %v", svc.Namespace, svc.Name, err)
|
klog.ErrorS(err, "Failed to build ResourceBinding for Service", "namespace", svc.Namespace, "name", svc.Name)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -417,17 +417,17 @@ func (c *MCSController) propagateService(ctx context.Context, mcs *networkingv1a
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to create/update ResourceBinding(%s/%s):%v", bindingCopy.Namespace, bindingCopy.Name, err)
|
klog.ErrorS(err, "Failed to create/update ResourceBinding", "namespace", bindingCopy.Namespace, "name", bindingCopy.Name)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
switch operationResult {
|
switch operationResult {
|
||||||
case controllerutil.OperationResultCreated:
|
case controllerutil.OperationResultCreated:
|
||||||
klog.Infof("Create ResourceBinding(%s/%s) successfully.", binding.GetNamespace(), binding.GetName())
|
klog.InfoS("Create ResourceBinding successfully.", "namespace", binding.GetNamespace(), "name", binding.GetName())
|
||||||
case controllerutil.OperationResultUpdated:
|
case controllerutil.OperationResultUpdated:
|
||||||
klog.Infof("Update ResourceBinding(%s/%s) successfully.", binding.GetNamespace(), binding.GetName())
|
klog.InfoS("Update ResourceBinding successfully.", "namespace", binding.GetNamespace(), "name", binding.GetName())
|
||||||
default:
|
default:
|
||||||
klog.V(2).Infof("ResourceBinding(%s/%s) is up to date.", binding.GetNamespace(), binding.GetName())
|
klog.V(2).InfoS("ResourceBinding is up to date.", "namespace", binding.GetNamespace(), "name", binding.GetName())
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -500,7 +500,7 @@ func (c *MCSController) claimMultiClusterServiceForService(ctx context.Context,
|
||||||
svcCopy.Annotations[networkingv1alpha1.MultiClusterServiceNamespaceAnnotation] = mcs.Namespace
|
svcCopy.Annotations[networkingv1alpha1.MultiClusterServiceNamespaceAnnotation] = mcs.Namespace
|
||||||
|
|
||||||
if err := c.Client.Update(ctx, svcCopy); err != nil {
|
if err := c.Client.Update(ctx, svcCopy); err != nil {
|
||||||
klog.Errorf("Failed to update service(%s/%s):%v ", svc.Namespace, svc.Name, err)
|
klog.ErrorS(err, "Failed to update service", "namespace", svc.Namespace, "name", svc.Name)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -608,7 +608,7 @@ func (c *MCSController) serviceHasCrossClusterMultiClusterService(svc *corev1.Se
|
||||||
if err := c.Client.Get(context.Background(),
|
if err := c.Client.Get(context.Background(),
|
||||||
types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name}, mcs); err != nil {
|
types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name}, mcs); err != nil {
|
||||||
if !apierrors.IsNotFound(err) {
|
if !apierrors.IsNotFound(err) {
|
||||||
klog.Errorf("Failed to get MultiClusterService(%s/%s):%v", svc.Namespace, svc.Name, err)
|
klog.ErrorS(err, "Failed to get MultiClusterService", "namespace", svc.Namespace, "name", svc.Name)
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
@ -626,10 +626,10 @@ func (c *MCSController) clusterMapFunc() handler.MapFunc {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.V(4).Infof("Begin to sync mcs with cluster %s.", clusterName)
|
klog.V(4).InfoS("Begin to sync mcs with cluster", "cluster", clusterName)
|
||||||
mcsList := &networkingv1alpha1.MultiClusterServiceList{}
|
mcsList := &networkingv1alpha1.MultiClusterServiceList{}
|
||||||
if err := c.Client.List(ctx, mcsList, &client.ListOptions{}); err != nil {
|
if err := c.Client.List(ctx, mcsList, &client.ListOptions{}); err != nil {
|
||||||
klog.Errorf("Failed to list MultiClusterService, error: %v", err)
|
klog.ErrorS(err, "Failed to list MultiClusterService")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -658,7 +658,7 @@ func (c *MCSController) needSyncMultiClusterService(mcs *networkingv1alpha1.Mult
|
||||||
|
|
||||||
providerClusters, err := helper.GetProviderClusters(c.Client, mcs)
|
providerClusters, err := helper.GetProviderClusters(c.Client, mcs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to get provider clusters by MultiClusterService(%s/%s):%v", mcs.Namespace, mcs.Name, err)
|
klog.ErrorS(err, "Failed to get provider clusters by MultiClusterService", "namespace", mcs.Namespace, "name", mcs.Name)
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
if providerClusters.Has(clusterName) {
|
if providerClusters.Has(clusterName) {
|
||||||
|
@ -667,7 +667,7 @@ func (c *MCSController) needSyncMultiClusterService(mcs *networkingv1alpha1.Mult
|
||||||
|
|
||||||
consumerClusters, err := helper.GetConsumerClusters(c.Client, mcs)
|
consumerClusters, err := helper.GetConsumerClusters(c.Client, mcs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to get consumer clusters by MultiClusterService(%s/%s):%v", mcs.Namespace, mcs.Name, err)
|
klog.ErrorS(err, "Failed to get consumer clusters by MultiClusterService", "namespace", mcs.Namespace, "name", mcs.Name)
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
if consumerClusters.Has(clusterName) {
|
if consumerClusters.Has(clusterName) {
|
||||||
|
|
|
@ -627,7 +627,8 @@ func getNodeAvailable(allocatable corev1.ResourceList, podResources *util.Resour
|
||||||
// When too many pods have been created, scheduling will fail so that the allocating pods number may be huge.
|
// When too many pods have been created, scheduling will fail so that the allocating pods number may be huge.
|
||||||
// If allowedPodNumber is less than or equal to 0, we don't allow more pods to be created.
|
// If allowedPodNumber is less than or equal to 0, we don't allow more pods to be created.
|
||||||
if allowedPodNumber <= 0 {
|
if allowedPodNumber <= 0 {
|
||||||
klog.Warning("The number of schedulable Pods on the node is less than or equal to 0, we won't add the node to cluster resource models.")
|
klog.InfoS("The number of schedulable Pods on the node is less than or equal to 0, " +
|
||||||
|
"we won't add the node to cluster resource models.")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,9 @@ import (
|
||||||
"crypto/rsa"
|
"crypto/rsa"
|
||||||
"crypto/x509"
|
"crypto/x509"
|
||||||
"encoding/pem"
|
"encoding/pem"
|
||||||
|
"fmt"
|
||||||
"math/big"
|
"math/big"
|
||||||
|
"net"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -69,3 +71,37 @@ func GenerateTestCACertificate() (string, string, error) {
|
||||||
|
|
||||||
return string(certPEMData), string(privKeyPEMData), nil
|
return string(certPEMData), string(privKeyPEMData), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetFreePorts attempts to find n available TCP ports on the specified host. It
|
||||||
|
// returns a slice of allocated port numbers or an error if it fails to acquire
|
||||||
|
// them.
|
||||||
|
func GetFreePorts(host string, n int) ([]int, error) {
|
||||||
|
ports := make([]int, 0, n)
|
||||||
|
listeners := make([]net.Listener, 0, n)
|
||||||
|
|
||||||
|
// Make sure we close all listeners if there's an error.
|
||||||
|
defer func() {
|
||||||
|
for _, l := range listeners {
|
||||||
|
l.Close()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
listener, err := net.Listen("tcp", fmt.Sprintf("%s:0", host))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
listeners = append(listeners, listener)
|
||||||
|
tcpAddr, ok := listener.Addr().(*net.TCPAddr)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("listener address is not a *net.TCPAddr")
|
||||||
|
}
|
||||||
|
ports = append(ports, tcpAddr.Port)
|
||||||
|
}
|
||||||
|
|
||||||
|
// At this point we have all ports, so we can close the listeners.
|
||||||
|
for _, l := range listeners {
|
||||||
|
l.Close()
|
||||||
|
}
|
||||||
|
return ports, nil
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue