mirror of https://github.com/linkerd/linkerd2.git
222 lines
6.0 KiB
Go
222 lines
6.0 KiB
Go
package watcher
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
|
|
ext "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1beta1"
|
|
"github.com/linkerd/linkerd2/controller/k8s"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
corev1 "k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/util/intstr"
|
|
"k8s.io/client-go/tools/cache"
|
|
)
|
|
|
|
const (
|
|
// PodIPIndex is the key for the index based on Pod IPs
|
|
PodIPIndex = "ip"
|
|
// HostIPIndex is the key for the index based on Host IP of pods with host network enabled
|
|
HostIPIndex = "hostIP"
|
|
// ExternalWorkloadIPIndex is the key for the index based on IP of externalworkloads
|
|
ExternalWorkloadIPIndex = "externalWorkloadIP"
|
|
)
|
|
|
|
type (
|
|
// IPPort holds the IP and port for some destination
|
|
IPPort struct {
|
|
IP string
|
|
Port Port
|
|
}
|
|
|
|
// ID is a namespace-qualified name.
|
|
ID struct {
|
|
Namespace string
|
|
Name string
|
|
|
|
// Only used for PodID
|
|
IPFamily corev1.IPFamily
|
|
}
|
|
// ServiceID is the namespace-qualified name of a service.
|
|
ServiceID = ID
|
|
// PodID is the namespace-qualified name of a pod.
|
|
PodID = ID
|
|
// ProfileID is the namespace-qualified name of a service profile.
|
|
ProfileID = ID
|
|
// PodID is the namespace-qualified name of an ExternalWorkload.
|
|
ExternalWorkloadID = ID
|
|
|
|
// Port is a numeric port.
|
|
Port = uint32
|
|
namedPort = intstr.IntOrString
|
|
|
|
// InvalidService is an error which indicates that the authority is not a
|
|
// valid service.
|
|
InvalidService struct {
|
|
authority string
|
|
}
|
|
)
|
|
|
|
// Labels returns the labels for prometheus metrics associated to the service
|
|
func (id ServiceID) Labels() prometheus.Labels {
|
|
return prometheus.Labels{"namespace": id.Namespace, "name": id.Name}
|
|
}
|
|
|
|
func (is InvalidService) Error() string {
|
|
return fmt.Sprintf("Invalid k8s service %s", is.authority)
|
|
}
|
|
|
|
func invalidService(authority string) InvalidService {
|
|
return InvalidService{authority}
|
|
}
|
|
|
|
func (i ID) String() string {
|
|
return fmt.Sprintf("%s/%s", i.Namespace, i.Name)
|
|
}
|
|
|
|
// InitializeIndexers is used to initialize indexers on k8s informers, to be used across watchers
|
|
func InitializeIndexers(k8sAPI *k8s.API) error {
|
|
err := k8sAPI.Svc().Informer().AddIndexers(cache.Indexers{PodIPIndex: func(obj interface{}) ([]string, error) {
|
|
svc, ok := obj.(*corev1.Service)
|
|
if !ok {
|
|
return nil, errors.New("object is not a service")
|
|
}
|
|
|
|
if len(svc.Spec.ClusterIPs) != 0 {
|
|
return svc.Spec.ClusterIPs, nil
|
|
}
|
|
|
|
if svc.Spec.ClusterIP != "" {
|
|
return []string{svc.Spec.ClusterIP}, nil
|
|
}
|
|
|
|
return nil, nil
|
|
}})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("could not create an indexer for services: %w", err)
|
|
}
|
|
|
|
err = k8sAPI.Pod().Informer().AddIndexers(cache.Indexers{PodIPIndex: func(obj interface{}) ([]string, error) {
|
|
if pod, ok := obj.(*corev1.Pod); ok {
|
|
// Pods that run in the host network are indexed by the host IP
|
|
// indexer in the IP watcher; they should be skipped by the pod
|
|
// IP indexer which is responsible only for indexing pod network
|
|
// pods.
|
|
if pod.Spec.HostNetwork {
|
|
return nil, nil
|
|
}
|
|
ips := []string{}
|
|
for _, pip := range pod.Status.PodIPs {
|
|
if pip.IP != "" {
|
|
ips = append(ips, pip.IP)
|
|
}
|
|
}
|
|
if len(ips) == 0 && pod.Status.PodIP != "" {
|
|
ips = append(ips, pod.Status.PodIP)
|
|
}
|
|
return ips, nil
|
|
}
|
|
return nil, fmt.Errorf("object is not a pod")
|
|
}})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("could not create an indexer for pods: %w", err)
|
|
}
|
|
|
|
err = k8sAPI.Pod().Informer().AddIndexers(cache.Indexers{HostIPIndex: func(obj interface{}) ([]string, error) {
|
|
pod, ok := obj.(*corev1.Pod)
|
|
if !ok {
|
|
return nil, errors.New("object is not a pod")
|
|
}
|
|
|
|
ips := []string{}
|
|
for _, hip := range pod.Status.HostIPs {
|
|
ips = append(ips, hip.IP)
|
|
}
|
|
if len(ips) == 0 && pod.Status.HostIP != "" {
|
|
ips = append(ips, pod.Status.HostIP)
|
|
}
|
|
if len(ips) == 0 {
|
|
return []string{}, nil
|
|
}
|
|
|
|
// If the pod is reachable from the host network, then for
|
|
// each of its containers' ports that exposes a host port, add
|
|
// that hostIP:hostPort endpoint to the indexer.
|
|
addrs := []string{}
|
|
for _, c := range pod.Spec.Containers {
|
|
for _, p := range c.Ports {
|
|
if p.HostPort == 0 {
|
|
continue
|
|
}
|
|
for _, ip := range ips {
|
|
addrs = append(addrs, net.JoinHostPort(ip, fmt.Sprintf("%d", p.HostPort)))
|
|
}
|
|
}
|
|
}
|
|
return addrs, nil
|
|
}})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("could not create an indexer for pods: %w", err)
|
|
}
|
|
|
|
err = k8sAPI.ExtWorkload().Informer().AddIndexers(cache.Indexers{ExternalWorkloadIPIndex: func(obj interface{}) ([]string, error) {
|
|
ew, ok := obj.(*ext.ExternalWorkload)
|
|
if !ok {
|
|
return nil, errors.New("object is not an externalworkload")
|
|
}
|
|
|
|
addrs := []string{}
|
|
for _, ip := range ew.Spec.WorkloadIPs {
|
|
for _, port := range ew.Spec.Ports {
|
|
addrs = append(addrs, net.JoinHostPort(ip.Ip, fmt.Sprintf("%d", port.Port)))
|
|
}
|
|
}
|
|
return addrs, nil
|
|
}})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("could not create an indexer for externalworkloads: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func getIndexedPods(k8sAPI *k8s.API, indexName string, key string) ([]*corev1.Pod, error) {
|
|
objs, err := k8sAPI.Pod().Informer().GetIndexer().ByIndex(indexName, key)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed getting %s indexed pods: %w", indexName, err)
|
|
}
|
|
pods := make([]*corev1.Pod, 0)
|
|
for _, obj := range objs {
|
|
pod := obj.(*corev1.Pod)
|
|
if !podNotTerminating(pod) {
|
|
continue
|
|
}
|
|
pods = append(pods, pod)
|
|
}
|
|
return pods, nil
|
|
}
|
|
|
|
func getIndexedExternalWorkloads(k8sAPI *k8s.API, indexName string, key string) ([]*ext.ExternalWorkload, error) {
|
|
objs, err := k8sAPI.ExtWorkload().Informer().GetIndexer().ByIndex(indexName, key)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed getting %s indexed externalworkloads: %w", indexName, err)
|
|
}
|
|
workloads := make([]*ext.ExternalWorkload, 0)
|
|
for _, obj := range objs {
|
|
workload := obj.(*ext.ExternalWorkload)
|
|
workloads = append(workloads, workload)
|
|
}
|
|
return workloads, nil
|
|
}
|
|
|
|
func podNotTerminating(pod *corev1.Pod) bool {
|
|
phase := pod.Status.Phase
|
|
podTerminated := phase == corev1.PodSucceeded || phase == corev1.PodFailed
|
|
podTerminating := pod.DeletionTimestamp != nil
|
|
return !podTerminating && !podTerminated
|
|
}
|