Only process server updates on workloads affected by the server (#12017)

We the destination controller's workload watcher receives an update for any Server resource, it recomputes opaqueness for every workload.  This is because the Server update may have changed opaqueness for that workload.  However, this is very CPU intensive for the destination controller, especially during resyncs when we get Server updates for every Server resource in the cluster.

Instead, we only need to recompute opaqueness for workloads that are selected by the old version of the Server or by the new version of the Server.  If a workload is not selected by either the new or old version of the Server, then the Server update cannot have changed the workload's opaqueness.

Signed-off-by: Alex Leong <alex@buoyant.io>
This commit is contained in:
Alex Leong 2024-02-01 10:42:50 -08:00 committed by GitHub
parent 0b94f62682
commit 3902b339e7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 61 additions and 7 deletions

View File

@ -20,6 +20,8 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
)
@ -92,8 +94,8 @@ func NewWorkloadWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *logg
}
_, err = k8sAPI.Srv().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ww.updateServers,
DeleteFunc: ww.updateServers,
AddFunc: ww.addOrDeleteServer,
DeleteFunc: ww.addOrDeleteServer,
UpdateFunc: ww.updateServer,
})
if err != nil {
@ -310,27 +312,51 @@ func (ww *WorkloadWatcher) updateServer(oldObj interface{}, newObj interface{})
oldUpdated := latestUpdated(oldServer.ManagedFields)
updated := latestUpdated(newServer.ManagedFields)
if !updated.IsZero() && updated != oldUpdated {
delta := time.Since(updated)
serverInformerLag.Observe(delta.Seconds())
}
ww.updateServers(newObj)
ww.updateServers(oldServer, newServer)
}
func (ww *WorkloadWatcher) addOrDeleteServer(obj interface{}) {
server, ok := obj.(*v1beta2.Server)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
ww.log.Errorf("Couldn't get object from DeletedFinalStateUnknown %#v", obj)
return
}
server, ok = tombstone.Obj.(*v1beta2.Server)
if !ok {
ww.log.Errorf("DeletedFinalStateUnknown contained object that is not a Server %#v", obj)
return
}
}
ww.updateServers(server)
}
// updateServer triggers an Update() call to the listeners of the workloadPublishers
// whose pod matches the Server's podSelector or whose externalworkload matches
// the Server's externalworkload selection. This function is an event handler
// so it cannot block.
func (ww *WorkloadWatcher) updateServers(_ any) {
// whose pod matches the any of the Servers' podSelector or whose
// externalworkload matches any of the Servers' externalworkload selection. This
// function is an event handler so it cannot block.
func (ww *WorkloadWatcher) updateServers(servers ...*v1beta2.Server) {
ww.mu.RLock()
defer ww.mu.RUnlock()
for _, wp := range ww.publishers {
var opaquePorts map[uint32]struct{}
if wp.pod != nil {
if !ww.isPodSelectedByAny(wp.pod, servers...) {
continue
}
opaquePorts = GetAnnotatedOpaquePorts(wp.pod, ww.defaultOpaquePorts)
} else if wp.externalWorkload != nil {
if !ww.isExternalWorkloadSelectedByAny(wp.externalWorkload, servers...) {
continue
}
opaquePorts = GetAnnotatedOpaquePortsForExternalWorkload(wp.externalWorkload, ww.defaultOpaquePorts)
} else {
continue
@ -369,6 +395,34 @@ func (ww *WorkloadWatcher) updateServers(_ any) {
}
}
func (ww *WorkloadWatcher) isPodSelectedByAny(pod *corev1.Pod, servers ...*v1beta2.Server) bool {
for _, s := range servers {
selector, err := metav1.LabelSelectorAsSelector(s.Spec.PodSelector)
if err != nil {
ww.log.Errorf("failed to parse PodSelector of Server %s.%s: %q", s.GetName(), s.GetNamespace(), err)
continue
}
if selector.Matches(labels.Set(pod.Labels)) {
return true
}
}
return false
}
func (ww *WorkloadWatcher) isExternalWorkloadSelectedByAny(ew *ext.ExternalWorkload, servers ...*v1beta2.Server) bool {
for _, s := range servers {
selector, err := metav1.LabelSelectorAsSelector(s.Spec.ExternalWorkloadSelector)
if err != nil {
ww.log.Errorf("failed to parse ExternalWorkloadSelector of Server %s.%s: %q", s.GetName(), s.GetNamespace(), err)
continue
}
if selector.Matches(labels.Set(ew.Labels)) {
return true
}
}
return false
}
// getOrNewWorkloadPublisher returns the workloadPublisher for the given target if it
// exists. Otherwise, it creates a new one and returns it.
func (ww *WorkloadWatcher) getOrNewWorkloadPublisher(service *ServiceID, hostname, ip string, port Port) (*workloadPublisher, error) {