use the existing utilerrors.AggregateGoroutines func to collect each cluster's replicas concurrently

Signed-off-by: carlory <baofa.fan@daocloud.io>
This commit is contained in:
carlory 2022-07-18 22:12:39 +08:00
parent f46d90f6b4
commit aca3965545
3 changed files with 12 additions and 83 deletions

View File

@ -3,10 +3,10 @@ package client
import (
"context"
"fmt"
"sync"
"time"
"google.golang.org/grpc/metadata"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
@ -127,22 +127,18 @@ func getClusterReplicasConcurrently(parentCtx context.Context, clusters []string
ctx, cancel := context.WithTimeout(parentCtx, timeout)
defer cancel()
availableTargetClusters := make([]workv1alpha2.TargetCluster, len(clusters))
var wg sync.WaitGroup
errChan := make(chan error, len(clusters))
for i := range clusters {
wg.Add(1)
go func(idx int, cluster string) {
defer wg.Done()
replicas, err := getClusterReplicas(ctx, cluster)
clusterReplicas := make([]workv1alpha2.TargetCluster, len(clusters))
funcs := make([]func() error, len(clusters))
for index, cluster := range clusters {
localIndex, localCluster := index, cluster
funcs[index] = func() error {
replicas, err := getClusterReplicas(ctx, localCluster)
if err != nil {
errChan <- err
return err
}
availableTargetClusters[idx] = workv1alpha2.TargetCluster{Name: cluster, Replicas: replicas}
}(i, clusters[i])
clusterReplicas[localIndex] = workv1alpha2.TargetCluster{Name: localCluster, Replicas: replicas}
return nil
}
}
wg.Wait()
return availableTargetClusters, util.AggregateErrors(errChan)
return clusterReplicas, utilerrors.AggregateGoroutines(funcs...)
}

View File

@ -1,22 +0,0 @@
package util
import utilerrors "k8s.io/apimachinery/pkg/util/errors"
// AggregateErrors will receive all errors from the channel and stuff all non-nil errors
// into the returned Aggregate.
func AggregateErrors(ch <-chan error) error {
var errs []error
for {
drained := false
select {
case err := <-ch:
errs = append(errs, err)
default:
drained = true
}
if drained {
break
}
}
return utilerrors.NewAggregate(errs)
}

View File

@ -1,45 +0,0 @@
package util
import (
"fmt"
"reflect"
"testing"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
)
func TestAggregateErrors(t *testing.T) {
err1 := fmt.Errorf("error1")
err2 := fmt.Errorf("error2")
channel := make(chan error, 2)
channel <- err1
channel <- err2
tests := []struct {
name string
input <-chan error
expected error
}{
{
name: "nil channel",
input: nil,
expected: nil,
},
{
name: "channel has no error",
input: make(chan error, 1),
expected: nil,
},
{
name: "channel has 2 errors",
input: channel,
expected: utilerrors.NewAggregate([]error{err1, err2}),
},
}
for _, test := range tests {
if got := AggregateErrors(test.input); !reflect.DeepEqual(got, test.expected) {
t.Errorf("Test %s failed: expected %v, but got %v", test.name, test.expected, got)
}
}
}