From 11dbc9b97f21c2aa54d831488202dc26e1644e57 Mon Sep 17 00:00:00 2001 From: justinsb Date: Tue, 14 Mar 2023 12:45:45 +0000 Subject: [PATCH] prunev2: find resources in parallel To improve wall-clock speed, we run list operations in parallel. This particularly helps when the round-trip time is high. We issue requests as quickly as possible, kube-apiservers should all have priority and fairness at this point and we don't want to duplicate/fight that system. Kubernetes-commit: 82eee59d0feb4b303e6ef78ebb7ec646a059f266 --- pkg/cmd/apply/applyset_pruner.go | 57 +++++++++++++++++++++++++------- 1 file changed, 45 insertions(+), 12 deletions(-) diff --git a/pkg/cmd/apply/applyset_pruner.go b/pkg/cmd/apply/applyset_pruner.go index 50e7d0f2..94827cde 100644 --- a/pkg/cmd/apply/applyset_pruner.go +++ b/pkg/cmd/apply/applyset_pruner.go @@ -19,6 +19,7 @@ package apply import ( "context" "fmt" + "sync" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -65,8 +66,17 @@ func (p *PruneObject) String() string { // FindAllObjectsToPrune returns the list of objects that will be pruned. // Calling this instead of Prune can be useful for dry-run / diff behaviour. func (a *ApplySet) FindAllObjectsToPrune(ctx context.Context, dynamicClient dynamic.Interface, visitedUids sets.Set[types.UID]) ([]PruneObject, error) { - var allObjects []PruneObject - // TODO: Run discovery in parallel (and maybe in consistent order?) + type task struct { + namespace string + restMapping *meta.RESTMapping + + err error + results []PruneObject + } + var tasks []*task + + // We run discovery in parallel, in as many goroutines as priority and fairness will allow + // (We don't expect many requests in real-world scenarios - maybe tens, unlikely to be hundreds) for _, restMapping := range a.AllPrunableResources() { switch restMapping.Scope.Name() { case meta.RESTScopeNameNamespace: @@ -75,25 +85,48 @@ func (a *ApplySet) FindAllObjectsToPrune(ctx context.Context, dynamicClient dyna // Just double-check because otherwise we get cryptic error messages return nil, fmt.Errorf("unexpectedly encountered empty namespace during prune of namespace-scoped resource %v", restMapping.GroupVersionKind) } - pruneObjects, err := a.findObjectsToPrune(ctx, dynamicClient, visitedUids, namespace, restMapping) - if err != nil { - return nil, fmt.Errorf("listing %v objects for prune: %w", restMapping.GroupVersionKind.String(), err) - } - allObjects = append(allObjects, pruneObjects...) + tasks = append(tasks, &task{ + namespace: namespace, + restMapping: restMapping, + }) } case meta.RESTScopeNameRoot: - pruneObjects, err := a.findObjectsToPrune(ctx, dynamicClient, visitedUids, metav1.NamespaceNone, restMapping) - if err != nil { - return nil, fmt.Errorf("listing %v objects for prune: %w", restMapping.GroupVersionKind.String(), err) - } - allObjects = append(allObjects, pruneObjects...) + tasks = append(tasks, &task{ + restMapping: restMapping, + }) default: return nil, fmt.Errorf("unhandled scope %q", restMapping.Scope.Name()) } } + var wg sync.WaitGroup + + for i := range tasks { + task := tasks[i] + wg.Add(1) + go func() { + defer wg.Done() + + results, err := a.findObjectsToPrune(ctx, dynamicClient, visitedUids, task.namespace, task.restMapping) + if err != nil { + task.err = fmt.Errorf("listing %v objects for pruning: %w", task.restMapping.GroupVersionKind.String(), err) + } else { + task.results = results + } + }() + } + // Wait for all the goroutines to finish + wg.Wait() + + var allObjects []PruneObject + for _, task := range tasks { + if task.err != nil { + return nil, task.err + } + allObjects = append(allObjects, task.results...) + } return allObjects, nil }