fix application controller

Signed-off-by: Poor12 <shentiecheng@huawei.com>
This commit is contained in:
Poor12 2023-05-06 15:30:47 +08:00
parent 99f3c4f1f4
commit 3e63826f97
4 changed files with 62 additions and 73 deletions

View File

@ -4,37 +4,37 @@ import (
"sync"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
)
type workloadUnhealthyMap struct {
sync.RWMutex
// key is the resource type
// key is the NamespacedName of the binding
// value is also a map. Its key is the cluster where the unhealthy workload resides.
// Its value is the time when the unhealthy state was first observed.
workloadUnhealthy map[keys.ClusterWideKey]map[string]metav1.Time
workloadUnhealthy map[types.NamespacedName]map[string]metav1.Time
}
func newWorkloadUnhealthyMap() *workloadUnhealthyMap {
return &workloadUnhealthyMap{
workloadUnhealthy: make(map[keys.ClusterWideKey]map[string]metav1.Time),
workloadUnhealthy: make(map[types.NamespacedName]map[string]metav1.Time),
}
}
func (m *workloadUnhealthyMap) delete(key keys.ClusterWideKey) {
func (m *workloadUnhealthyMap) delete(key types.NamespacedName) {
m.Lock()
defer m.Unlock()
delete(m.workloadUnhealthy, key)
}
func (m *workloadUnhealthyMap) hasWorkloadBeenUnhealthy(resource keys.ClusterWideKey, cluster string) bool {
func (m *workloadUnhealthyMap) hasWorkloadBeenUnhealthy(key types.NamespacedName, cluster string) bool {
m.RLock()
defer m.RUnlock()
unhealthyClusters := m.workloadUnhealthy[resource]
unhealthyClusters := m.workloadUnhealthy[key]
if unhealthyClusters == nil {
return false
}
@ -43,32 +43,32 @@ func (m *workloadUnhealthyMap) hasWorkloadBeenUnhealthy(resource keys.ClusterWid
return exist
}
func (m *workloadUnhealthyMap) setTimeStamp(resource keys.ClusterWideKey, cluster string) {
func (m *workloadUnhealthyMap) setTimeStamp(key types.NamespacedName, cluster string) {
m.Lock()
defer m.Unlock()
unhealthyClusters := m.workloadUnhealthy[resource]
unhealthyClusters := m.workloadUnhealthy[key]
if unhealthyClusters == nil {
unhealthyClusters = make(map[string]metav1.Time)
}
unhealthyClusters[cluster] = metav1.Now()
m.workloadUnhealthy[resource] = unhealthyClusters
m.workloadUnhealthy[key] = unhealthyClusters
}
func (m *workloadUnhealthyMap) getTimeStamp(resource keys.ClusterWideKey, cluster string) metav1.Time {
func (m *workloadUnhealthyMap) getTimeStamp(key types.NamespacedName, cluster string) metav1.Time {
m.RLock()
defer m.RUnlock()
unhealthyClusters := m.workloadUnhealthy[resource]
unhealthyClusters := m.workloadUnhealthy[key]
return unhealthyClusters[cluster]
}
func (m *workloadUnhealthyMap) deleteIrrelevantClusters(resource keys.ClusterWideKey, allClusters sets.Set[string]) {
func (m *workloadUnhealthyMap) deleteIrrelevantClusters(key types.NamespacedName, allClusters sets.Set[string], healthyClusters []string) {
m.Lock()
defer m.Unlock()
unhealthyClusters := m.workloadUnhealthy[resource]
unhealthyClusters := m.workloadUnhealthy[key]
if unhealthyClusters == nil {
return
}
@ -77,19 +77,27 @@ func (m *workloadUnhealthyMap) deleteIrrelevantClusters(resource keys.ClusterWid
delete(unhealthyClusters, cluster)
}
}
m.workloadUnhealthy[resource] = unhealthyClusters
for _, cluster := range healthyClusters {
delete(unhealthyClusters, cluster)
}
m.workloadUnhealthy[key] = unhealthyClusters
}
// filterIrrelevantClusters filters clusters which is in the process of eviction or in the Healthy/Unknown state.
func filterIrrelevantClusters(aggregatedStatusItems []workv1alpha2.AggregatedStatusItem, resourceBindingSpec workv1alpha2.ResourceBindingSpec) []string {
var filteredClusters []string
// distinguishUnhealthyClustersWithOthers distinguishes clusters which is in the unHealthy state(not in the process of eviction) with others.
func distinguishUnhealthyClustersWithOthers(aggregatedStatusItems []workv1alpha2.AggregatedStatusItem, resourceBindingSpec workv1alpha2.ResourceBindingSpec) ([]string, []string) {
var unhealthyClusters, others []string
for _, aggregatedStatusItem := range aggregatedStatusItems {
cluster := aggregatedStatusItem.ClusterName
if aggregatedStatusItem.Health == workv1alpha2.ResourceUnhealthy && !resourceBindingSpec.ClusterInGracefulEvictionTasks(cluster) {
filteredClusters = append(filteredClusters, cluster)
unhealthyClusters = append(unhealthyClusters, cluster)
}
if aggregatedStatusItem.Health == workv1alpha2.ResourceHealthy || aggregatedStatusItem.Health == workv1alpha2.ResourceUnknown {
others = append(others, cluster)
}
}
return filteredClusters
return unhealthyClusters, others
}

View File

@ -7,12 +7,13 @@ import (
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
)
func TestFilterIrrelevantClusters(t *testing.T) {
func TestDistinguishUnhealthyClustersWithOthers(t *testing.T) {
tests := []struct {
name string
aggregatedStatusItems []workv1alpha2.AggregatedStatusItem
resourceBindingSpec workv1alpha2.ResourceBindingSpec
expectedClusters []string
expectedOthers []string
}{
{
name: "all applications are healthy",
@ -28,6 +29,7 @@ func TestFilterIrrelevantClusters(t *testing.T) {
},
resourceBindingSpec: workv1alpha2.ResourceBindingSpec{},
expectedClusters: nil,
expectedOthers: []string{"member1", "member2"},
},
{
name: "all applications are unknown",
@ -43,6 +45,7 @@ func TestFilterIrrelevantClusters(t *testing.T) {
},
resourceBindingSpec: workv1alpha2.ResourceBindingSpec{},
expectedClusters: nil,
expectedOthers: []string{"member1", "member2"},
},
{
name: "one application is unhealthy and not in gracefulEvictionTasks",
@ -58,6 +61,7 @@ func TestFilterIrrelevantClusters(t *testing.T) {
},
resourceBindingSpec: workv1alpha2.ResourceBindingSpec{},
expectedClusters: []string{"member2"},
expectedOthers: []string{"member1"},
},
{
name: "one application is unhealthy and in gracefulEvictionTasks",
@ -79,13 +83,14 @@ func TestFilterIrrelevantClusters(t *testing.T) {
},
},
expectedClusters: nil,
expectedOthers: []string{"member1"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := filterIrrelevantClusters(tt.aggregatedStatusItems, tt.resourceBindingSpec); !reflect.DeepEqual(got, tt.expectedClusters) {
t.Errorf("filterIrrelevantClusters() = %v, want %v", got, tt.expectedClusters)
if got, gotOthers := distinguishUnhealthyClustersWithOthers(tt.aggregatedStatusItems, tt.resourceBindingSpec); !reflect.DeepEqual(got, tt.expectedClusters) || !reflect.DeepEqual(gotOthers, tt.expectedOthers) {
t.Errorf("distinguishUnhealthyClustersWithOthers() = (%v, %v), want (%v, %v)", got, gotOthers, tt.expectedClusters, tt.expectedOthers)
}
})
}

View File

@ -8,6 +8,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
@ -25,7 +26,6 @@ import (
"github.com/karmada-io/karmada/pkg/features"
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
"github.com/karmada-io/karmada/pkg/util/helper"
)
@ -52,22 +52,14 @@ func (c *CRBApplicationFailoverController) Reconcile(ctx context.Context, req co
binding := &workv1alpha2.ClusterResourceBinding{}
if err := c.Client.Get(ctx, req.NamespacedName, binding); err != nil {
if apierrors.IsNotFound(err) {
resource, err := helper.ConstructClusterWideKey(binding.Spec.Resource)
if err != nil {
return controllerruntime.Result{Requeue: true}, err
}
c.workloadUnhealthyMap.delete(resource)
c.workloadUnhealthyMap.delete(req.NamespacedName)
return controllerruntime.Result{}, nil
}
return controllerruntime.Result{Requeue: true}, err
}
if !c.clusterResourceBindingFilter(binding) {
resource, err := helper.ConstructClusterWideKey(binding.Spec.Resource)
if err != nil {
return controllerruntime.Result{Requeue: true}, err
}
c.workloadUnhealthyMap.delete(resource)
c.workloadUnhealthyMap.delete(req.NamespacedName)
return controllerruntime.Result{}, nil
}
@ -82,13 +74,13 @@ func (c *CRBApplicationFailoverController) Reconcile(ctx context.Context, req co
return controllerruntime.Result{}, nil
}
func (c *CRBApplicationFailoverController) detectFailure(clusters []string, tolerationSeconds *int32, resource keys.ClusterWideKey) (int32, []string) {
func (c *CRBApplicationFailoverController) detectFailure(clusters []string, tolerationSeconds *int32, key types.NamespacedName) (int32, []string) {
var needEvictClusters []string
duration := int32(math.MaxInt32)
for _, cluster := range clusters {
if !c.workloadUnhealthyMap.hasWorkloadBeenUnhealthy(resource, cluster) {
c.workloadUnhealthyMap.setTimeStamp(resource, cluster)
if !c.workloadUnhealthyMap.hasWorkloadBeenUnhealthy(key, cluster) {
c.workloadUnhealthyMap.setTimeStamp(key, cluster)
if duration > *tolerationSeconds {
duration = *tolerationSeconds
}
@ -97,7 +89,7 @@ func (c *CRBApplicationFailoverController) detectFailure(clusters []string, tole
// When the workload in a cluster is in an unhealthy state for more than the tolerance time,
// and the cluster has not been in the GracefulEvictionTasks,
// the cluster will be added to the list that needs to be evicted.
unHealthyTimeStamp := c.workloadUnhealthyMap.getTimeStamp(resource, cluster)
unHealthyTimeStamp := c.workloadUnhealthyMap.getTimeStamp(key, cluster)
timeNow := metav1.Now()
if timeNow.After(unHealthyTimeStamp.Add(time.Duration(*tolerationSeconds) * time.Second)) {
needEvictClusters = append(needEvictClusters, cluster)
@ -115,22 +107,18 @@ func (c *CRBApplicationFailoverController) detectFailure(clusters []string, tole
}
func (c *CRBApplicationFailoverController) syncBinding(binding *workv1alpha2.ClusterResourceBinding) (time.Duration, error) {
key := types.NamespacedName{Name: binding.Name, Namespace: binding.Namespace}
tolerationSeconds := binding.Spec.Failover.Application.DecisionConditions.TolerationSeconds
resource, err := helper.ConstructClusterWideKey(binding.Spec.Resource)
if err != nil {
klog.Errorf("failed to get key from binding(%s)'s resource", binding.Name)
return 0, err
}
allClusters := sets.New[string]()
for _, cluster := range binding.Spec.Clusters {
allClusters.Insert(cluster.Name)
}
unhealthyClusters := filterIrrelevantClusters(binding.Status.AggregatedStatus, binding.Spec)
duration, needEvictClusters := c.detectFailure(unhealthyClusters, tolerationSeconds, resource)
unhealthyClusters, others := distinguishUnhealthyClustersWithOthers(binding.Status.AggregatedStatus, binding.Spec)
duration, needEvictClusters := c.detectFailure(unhealthyClusters, tolerationSeconds, key)
err = c.evictBinding(binding, needEvictClusters)
err := c.evictBinding(binding, needEvictClusters)
if err != nil {
klog.Errorf("Failed to evict binding(%s), err: %v.", binding.Name, err)
return 0, err
@ -142,8 +130,8 @@ func (c *CRBApplicationFailoverController) syncBinding(binding *workv1alpha2.Clu
}
}
// Cleanup clusters that have been evicted or removed in the workloadUnhealthyMap
c.workloadUnhealthyMap.deleteIrrelevantClusters(resource, allClusters)
// Cleanup clusters on which the application status is not unhealthy and clusters that have been evicted or removed in the workloadUnhealthyMap.
c.workloadUnhealthyMap.deleteIrrelevantClusters(key, allClusters, others)
return time.Duration(duration) * time.Second, nil
}

View File

@ -8,6 +8,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
@ -25,7 +26,6 @@ import (
"github.com/karmada-io/karmada/pkg/features"
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
"github.com/karmada-io/karmada/pkg/util/helper"
)
@ -52,22 +52,14 @@ func (c *RBApplicationFailoverController) Reconcile(ctx context.Context, req con
binding := &workv1alpha2.ResourceBinding{}
if err := c.Client.Get(ctx, req.NamespacedName, binding); err != nil {
if apierrors.IsNotFound(err) {
resource, err := helper.ConstructClusterWideKey(binding.Spec.Resource)
if err != nil {
return controllerruntime.Result{Requeue: true}, err
}
c.workloadUnhealthyMap.delete(resource)
c.workloadUnhealthyMap.delete(req.NamespacedName)
return controllerruntime.Result{}, nil
}
return controllerruntime.Result{Requeue: true}, err
}
if !c.bindingFilter(binding) {
resource, err := helper.ConstructClusterWideKey(binding.Spec.Resource)
if err != nil {
return controllerruntime.Result{Requeue: true}, err
}
c.workloadUnhealthyMap.delete(resource)
c.workloadUnhealthyMap.delete(req.NamespacedName)
return controllerruntime.Result{}, nil
}
@ -82,13 +74,13 @@ func (c *RBApplicationFailoverController) Reconcile(ctx context.Context, req con
return controllerruntime.Result{}, nil
}
func (c *RBApplicationFailoverController) detectFailure(clusters []string, tolerationSeconds *int32, resource keys.ClusterWideKey) (int32, []string) {
func (c *RBApplicationFailoverController) detectFailure(clusters []string, tolerationSeconds *int32, key types.NamespacedName) (int32, []string) {
var needEvictClusters []string
duration := int32(math.MaxInt32)
for _, cluster := range clusters {
if !c.workloadUnhealthyMap.hasWorkloadBeenUnhealthy(resource, cluster) {
c.workloadUnhealthyMap.setTimeStamp(resource, cluster)
if !c.workloadUnhealthyMap.hasWorkloadBeenUnhealthy(key, cluster) {
c.workloadUnhealthyMap.setTimeStamp(key, cluster)
if duration > *tolerationSeconds {
duration = *tolerationSeconds
}
@ -97,7 +89,7 @@ func (c *RBApplicationFailoverController) detectFailure(clusters []string, toler
// When the workload in a cluster is in an unhealthy state for more than the tolerance time,
// and the cluster has not been in the GracefulEvictionTasks,
// the cluster will be added to the list that needs to be evicted.
unHealthyTimeStamp := c.workloadUnhealthyMap.getTimeStamp(resource, cluster)
unHealthyTimeStamp := c.workloadUnhealthyMap.getTimeStamp(key, cluster)
timeNow := metav1.Now()
if timeNow.After(unHealthyTimeStamp.Add(time.Duration(*tolerationSeconds) * time.Second)) {
needEvictClusters = append(needEvictClusters, cluster)
@ -115,22 +107,18 @@ func (c *RBApplicationFailoverController) detectFailure(clusters []string, toler
}
func (c *RBApplicationFailoverController) syncBinding(binding *workv1alpha2.ResourceBinding) (time.Duration, error) {
key := types.NamespacedName{Name: binding.Name, Namespace: binding.Namespace}
tolerationSeconds := binding.Spec.Failover.Application.DecisionConditions.TolerationSeconds
resource, err := helper.ConstructClusterWideKey(binding.Spec.Resource)
if err != nil {
klog.Errorf("failed to get key from binding(%s)'s resource", binding.Name)
return 0, err
}
allClusters := sets.New[string]()
for _, cluster := range binding.Spec.Clusters {
allClusters.Insert(cluster.Name)
}
unhealthyClusters := filterIrrelevantClusters(binding.Status.AggregatedStatus, binding.Spec)
duration, needEvictClusters := c.detectFailure(unhealthyClusters, tolerationSeconds, resource)
unhealthyClusters, others := distinguishUnhealthyClustersWithOthers(binding.Status.AggregatedStatus, binding.Spec)
duration, needEvictClusters := c.detectFailure(unhealthyClusters, tolerationSeconds, key)
err = c.evictBinding(binding, needEvictClusters)
err := c.evictBinding(binding, needEvictClusters)
if err != nil {
klog.Errorf("Failed to evict binding(%s/%s), err: %v.", binding.Namespace, binding.Name, err)
return 0, err
@ -142,8 +130,8 @@ func (c *RBApplicationFailoverController) syncBinding(binding *workv1alpha2.Reso
}
}
// Cleanup clusters that have been evicted or removed in the workloadUnhealthyMap
c.workloadUnhealthyMap.deleteIrrelevantClusters(resource, allClusters)
// Cleanup clusters on which the application status is not unhealthy and clusters that have been evicted or removed in the workloadUnhealthyMap.
c.workloadUnhealthyMap.deleteIrrelevantClusters(key, allClusters, others)
return time.Duration(duration) * time.Second, nil
}