checks: use caching with opaqueports check (#6292)

Fixes #6272

The opaqueports is prone to fail, with `context deadline exceeded`
as there are numerous k8s API requests being performed.

This PR updates the pre-fetching logic to instead use
`controller/k8s` which provides a wrapper around `pkg/k8s` with
caching by using shared informers underneath!

This commit includes the following changes:
- Update `checkMisconfiguredOpaquePortAnnotations` to use
  `controllerk8s.KubeAPI` instead of `hc.kubeAPI`
- `kubeAPI.Sync` fn also had to be updated as it fails to check
if the sp and ts sharedinformers are nil, which might be the
case in cases like this where they are not needed.

We had to use `controllerK8s.NewAPI` for the initialization
instead of `controllerk8s.InitializeAPI` to take-in
`hc.kubeAPI` so as to support unit testing, etc as `hc.kubeAPI`
is how we pass the fake resources in unit tests.

Signed-off-by: Tarun Pothulapati <tarunpothulapati@outlook.com>
This commit is contained in:
Tarun Pothulapati 2021-06-23 10:28:15 +05:30 committed by GitHub
parent 06ef634a9b
commit ebbb3182a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 24 additions and 11 deletions

View File

@ -260,8 +260,14 @@ func NewAPI(
// Sync waits for all informers to be synced.
func (api *API) Sync(stopCh <-chan struct{}) {
api.sharedInformers.Start(stopCh)
api.spSharedInformers.Start(stopCh)
api.tsSharedInformers.Start(stopCh)
if api.spSharedInformers != nil {
api.spSharedInformers.Start(stopCh)
}
if api.tsSharedInformers != nil {
api.tsSharedInformers.Start(stopCh)
}
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

View File

@ -13,6 +13,7 @@ import (
"time"
configPb "github.com/linkerd/linkerd2/controller/gen/config"
controllerK8s "github.com/linkerd/linkerd2/controller/k8s"
l5dcharts "github.com/linkerd/linkerd2/pkg/charts/linkerd2"
"github.com/linkerd/linkerd2/pkg/config"
"github.com/linkerd/linkerd2/pkg/identity"
@ -28,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
yamlDecoder "k8s.io/apimachinery/pkg/util/yaml"
@ -2263,21 +2265,26 @@ func checkResources(resourceName string, objects []runtime.Object, expectedNames
// Check if there's a pod with the "opaque ports" annotation defined but a
// service selecting the aforementioned pod doesn't define it
func (hc *HealthChecker) checkMisconfiguredOpaquePortAnnotations(ctx context.Context) error {
services, err := hc.GetServices(ctx)
// Initialize and sync the kubernetes API
// This is used instead of `hc.kubeAPI` to limit multiple k8s API requests
// and use the caching logic in the shared informers
// TODO: move the shared informer code out of `controller/`, and into `pkg` to simplify the dependency tree.
kubeAPI := controllerK8s.NewAPI(hc.kubeAPI, nil, nil, controllerK8s.Endpoint, controllerK8s.Pod, controllerK8s.Svc)
kubeAPI.Sync(ctx.Done())
services, err := kubeAPI.Svc().Lister().Services(hc.DataPlaneNamespace).List(labels.Everything())
if err != nil {
return err
}
var errStrings []string
for _, service := range services {
if service.Spec.ClusterIP == "None" {
// skip headless services; they're handled differently
continue
}
endpoint, err := hc.kubeAPI.CoreV1().Endpoints(service.Namespace).Get(ctx, service.Name, metav1.GetOptions{})
endpoint, err := kubeAPI.Endpoint().Lister().Endpoints(service.Namespace).Get(service.Name)
if err != nil {
return err
}
@ -2286,7 +2293,7 @@ func (hc *HealthChecker) checkMisconfiguredOpaquePortAnnotations(ctx context.Con
for _, subset := range endpoint.Subsets {
for _, addr := range subset.Addresses {
if addr.TargetRef != nil && addr.TargetRef.Kind == "Pod" {
pod, err := hc.kubeAPI.CoreV1().Pods(service.Namespace).Get(ctx, addr.TargetRef.Name, metav1.GetOptions{})
pod, err := kubeAPI.Pod().Lister().Pods(service.Namespace).Get(addr.TargetRef.Name)
if err != nil {
return err
}
@ -2310,7 +2317,7 @@ func (hc *HealthChecker) checkMisconfiguredOpaquePortAnnotations(ctx context.Con
return nil
}
func misconfiguredOpaquePortAnnotationsInService(service corev1.Service, pods []*corev1.Pod) error {
func misconfiguredOpaquePortAnnotationsInService(service *corev1.Service, pods []*corev1.Pod) error {
for _, pod := range pods {
if err := misconfiguredOpaqueAnnotation(service, pod); err != nil {
return err
@ -2319,7 +2326,7 @@ func misconfiguredOpaquePortAnnotationsInService(service corev1.Service, pods []
return nil
}
func misconfiguredOpaqueAnnotation(service corev1.Service, pod *corev1.Pod) error {
func misconfiguredOpaqueAnnotation(service *corev1.Service, pod *corev1.Pod) error {
svcAnnotation, svcAnnotationOk := service.Annotations[k8s.ProxyOpaquePortsAnnotation]
podAnnotation, podAnnotationOk := pod.Annotations[k8s.ProxyOpaquePortsAnnotation]

View File

@ -3734,9 +3734,9 @@ subsets:
}
err = hc.checkMisconfiguredOpaquePortAnnotations(context.Background())
if err == nil && tc.expected != nil {
t.Fatalf("Expected check to be successful, got: %s", err)
t.Fatalf("Expected check to fail with %s", tc.expected.Error())
}
if err != nil {
if err != nil && tc.expected != nil {
if err.Error() != tc.expected.Error() {
t.Fatalf("Expected error: %s, received: %s", tc.expected, err)
}