prunev2: find resources in parallel

To improve wall-clock speed, we run list operations in parallel.  This
particularly helps when the round-trip time is high.

We issue requests as quickly as possible, kube-apiservers should all
have priority and fairness at this point and we don't want to
duplicate/fight that system.

Kubernetes-commit: 82eee59d0feb4b303e6ef78ebb7ec646a059f266
This commit is contained in:
justinsb 2023-03-14 12:45:45 +00:00 committed by Kubernetes Publisher
parent 67016c7205
commit 11dbc9b97f
1 changed files with 45 additions and 12 deletions

View File

@ -19,6 +19,7 @@ package apply
import (
"context"
"fmt"
"sync"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -65,8 +66,17 @@ func (p *PruneObject) String() string {
// FindAllObjectsToPrune returns the list of objects that will be pruned.
// Calling this instead of Prune can be useful for dry-run / diff behaviour.
func (a *ApplySet) FindAllObjectsToPrune(ctx context.Context, dynamicClient dynamic.Interface, visitedUids sets.Set[types.UID]) ([]PruneObject, error) {
var allObjects []PruneObject
// TODO: Run discovery in parallel (and maybe in consistent order?)
type task struct {
namespace string
restMapping *meta.RESTMapping
err error
results []PruneObject
}
var tasks []*task
// We run discovery in parallel, in as many goroutines as priority and fairness will allow
// (We don't expect many requests in real-world scenarios - maybe tens, unlikely to be hundreds)
for _, restMapping := range a.AllPrunableResources() {
switch restMapping.Scope.Name() {
case meta.RESTScopeNameNamespace:
@ -75,25 +85,48 @@ func (a *ApplySet) FindAllObjectsToPrune(ctx context.Context, dynamicClient dyna
// Just double-check because otherwise we get cryptic error messages
return nil, fmt.Errorf("unexpectedly encountered empty namespace during prune of namespace-scoped resource %v", restMapping.GroupVersionKind)
}
pruneObjects, err := a.findObjectsToPrune(ctx, dynamicClient, visitedUids, namespace, restMapping)
if err != nil {
return nil, fmt.Errorf("listing %v objects for prune: %w", restMapping.GroupVersionKind.String(), err)
}
allObjects = append(allObjects, pruneObjects...)
tasks = append(tasks, &task{
namespace: namespace,
restMapping: restMapping,
})
}
case meta.RESTScopeNameRoot:
pruneObjects, err := a.findObjectsToPrune(ctx, dynamicClient, visitedUids, metav1.NamespaceNone, restMapping)
if err != nil {
return nil, fmt.Errorf("listing %v objects for prune: %w", restMapping.GroupVersionKind.String(), err)
}
allObjects = append(allObjects, pruneObjects...)
tasks = append(tasks, &task{
restMapping: restMapping,
})
default:
return nil, fmt.Errorf("unhandled scope %q", restMapping.Scope.Name())
}
}
var wg sync.WaitGroup
for i := range tasks {
task := tasks[i]
wg.Add(1)
go func() {
defer wg.Done()
results, err := a.findObjectsToPrune(ctx, dynamicClient, visitedUids, task.namespace, task.restMapping)
if err != nil {
task.err = fmt.Errorf("listing %v objects for pruning: %w", task.restMapping.GroupVersionKind.String(), err)
} else {
task.results = results
}
}()
}
// Wait for all the goroutines to finish
wg.Wait()
var allObjects []PruneObject
for _, task := range tasks {
if task.err != nil {
return nil, task.err
}
allObjects = append(allObjects, task.results...)
}
return allObjects, nil
}