mirror of https://github.com/linkerd/linkerd2.git
Add support for ExternalWorkloads in endpoint profiles (#11952)
When a meshed client attempts to establish a connection directly to the workload IP of an ExternalWorkload, the destination controller should return an endpoint profile for that ExternalWorkload with a single endpoint and the metadata associated with that ExternalWorkload including: * mesh TLS identity * workload metric labels * opaque / protocol hints Signed-off-by: Alex Leong <alex@buoyant.io>
This commit is contained in:
parent
52288e29e4
commit
65f13de2ce
|
|
@ -111,7 +111,12 @@ func (ept *endpointProfileTranslator) Update(address *watcher.Address) error {
|
|||
}
|
||||
|
||||
func (ept *endpointProfileTranslator) update(address *watcher.Address) {
|
||||
opaquePorts := watcher.GetAnnotatedOpaquePorts(address.Pod, ept.defaultOpaquePorts)
|
||||
var opaquePorts map[uint32]struct{}
|
||||
if address.Pod != nil {
|
||||
opaquePorts = watcher.GetAnnotatedOpaquePorts(address.Pod, ept.defaultOpaquePorts)
|
||||
} else {
|
||||
opaquePorts = watcher.GetAnnotatedOpaquePortsForExternalWorkload(address.ExternalWorkload, ept.defaultOpaquePorts)
|
||||
}
|
||||
endpoint, err := ept.createEndpoint(*address, opaquePorts)
|
||||
if err != nil {
|
||||
ept.log.Errorf("Failed to create endpoint for %s:%d: %s",
|
||||
|
|
@ -139,7 +144,13 @@ func (ept *endpointProfileTranslator) update(address *watcher.Address) {
|
|||
}
|
||||
|
||||
func (ept *endpointProfileTranslator) createEndpoint(address watcher.Address, opaquePorts map[uint32]struct{}) (*pb.WeightedAddr, error) {
|
||||
weightedAddr, err := createWeightedAddr(address, opaquePorts, ept.enableH2Upgrade, ept.identityTrustDomain, ept.controllerNS)
|
||||
var weightedAddr *pb.WeightedAddr
|
||||
var err error
|
||||
if address.ExternalWorkload != nil {
|
||||
weightedAddr, err = createWeightedAddrForExternalWorkload(address, opaquePorts)
|
||||
} else {
|
||||
weightedAddr, err = createWeightedAddr(address, opaquePorts, ept.enableH2Upgrade, ept.identityTrustDomain, ept.controllerNS)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -148,6 +159,8 @@ func (ept *endpointProfileTranslator) createEndpoint(address watcher.Address, op
|
|||
// metadata, so it needs to be special-cased.
|
||||
if address.Pod != nil {
|
||||
weightedAddr.MetricLabels["namespace"] = address.Pod.Namespace
|
||||
} else if address.ExternalWorkload != nil {
|
||||
weightedAddr.MetricLabels["namespace"] = address.ExternalWorkload.Namespace
|
||||
}
|
||||
|
||||
return weightedAddr, err
|
||||
|
|
|
|||
|
|
@ -13,7 +13,6 @@ import (
|
|||
"github.com/linkerd/linkerd2/controller/k8s"
|
||||
"github.com/linkerd/linkerd2/pkg/addr"
|
||||
pkgK8s "github.com/linkerd/linkerd2/pkg/k8s"
|
||||
"github.com/linkerd/linkerd2/pkg/util"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
logging "github.com/sirupsen/logrus"
|
||||
|
|
@ -385,7 +384,7 @@ func (et *endpointTranslator) sendClientAdd(set watcher.AddressSet) {
|
|||
continue
|
||||
}
|
||||
} else if address.ExternalWorkload != nil {
|
||||
opaquePorts = getAnnotatedOpaquePortsForExternalWorkload(address.ExternalWorkload, et.defaultOpaquePorts)
|
||||
opaquePorts = watcher.GetAnnotatedOpaquePortsForExternalWorkload(address.ExternalWorkload, et.defaultOpaquePorts)
|
||||
wa, err = createWeightedAddrForExternalWorkload(address, opaquePorts)
|
||||
if err != nil {
|
||||
et.log.Errorf("Failed to translate ExternalWorkload endpoints to weighted addr: %s", err)
|
||||
|
|
@ -699,53 +698,3 @@ func getInboundPortFromExternalWorkload(ewSpec *ewv1alpha1.ExternalWorkloadSpec)
|
|||
|
||||
return 0, fmt.Errorf("failed to find %s port for given ExternalWorkloadSpec", pkgK8s.ProxyPortName)
|
||||
}
|
||||
|
||||
// getAnnotatedOpaquePortsForExternalWorkload returns the opaque ports for the external workload given its
|
||||
// annotations, or the default opaque ports if it's not annotated
|
||||
func getAnnotatedOpaquePortsForExternalWorkload(ew *ewv1alpha1.ExternalWorkload, defaultPorts map[uint32]struct{}) map[uint32]struct{} {
|
||||
if ew == nil {
|
||||
return defaultPorts
|
||||
}
|
||||
annotation, ok := ew.Annotations[pkgK8s.ProxyOpaquePortsAnnotation]
|
||||
if !ok {
|
||||
return defaultPorts
|
||||
}
|
||||
opaquePorts := make(map[uint32]struct{})
|
||||
if annotation != "" {
|
||||
for _, pr := range parseExternalWorkloadOpaquePorts(annotation, ew) {
|
||||
for _, port := range pr.Ports() {
|
||||
opaquePorts[uint32(port)] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
return opaquePorts
|
||||
}
|
||||
|
||||
func parseExternalWorkloadOpaquePorts(override string, ew *ewv1alpha1.ExternalWorkload) []util.PortRange {
|
||||
portRanges := util.GetPortRanges(override)
|
||||
var values []util.PortRange
|
||||
for _, pr := range portRanges {
|
||||
port, named := isNamedInExternalWorkload(pr, ew)
|
||||
if named {
|
||||
values = append(values, util.PortRange{UpperBound: int(port), LowerBound: int(port)})
|
||||
} else {
|
||||
pr, err := util.ParsePortRange(pr)
|
||||
if err != nil {
|
||||
logging.Warnf("Invalid port range [%v]: %s", pr, err)
|
||||
continue
|
||||
}
|
||||
values = append(values, pr)
|
||||
}
|
||||
}
|
||||
return values
|
||||
}
|
||||
|
||||
func isNamedInExternalWorkload(pr string, ew *ewv1alpha1.ExternalWorkload) (int32, bool) {
|
||||
for _, p := range ew.Spec.Ports {
|
||||
if p.Name == pr {
|
||||
return p.Port, true
|
||||
}
|
||||
}
|
||||
|
||||
return 0, false
|
||||
}
|
||||
|
|
|
|||
|
|
@ -362,7 +362,7 @@ func (ec *EndpointsController) updateService(obj interface{}) {
|
|||
ec.queue.Add(key)
|
||||
}
|
||||
|
||||
func isReady(ew *ewv1alpha1.ExternalWorkload) bool {
|
||||
func IsReady(ew *ewv1alpha1.ExternalWorkload) bool {
|
||||
if len(ew.Status.Conditions) == 0 {
|
||||
return false
|
||||
}
|
||||
|
|
@ -405,7 +405,7 @@ func labelsChanged(old, updated *ewv1alpha1.ExternalWorkload) bool {
|
|||
// since these are going to influence a change in a service's underlying
|
||||
// endpoint slice
|
||||
func specChanged(old, updated *ewv1alpha1.ExternalWorkload) bool {
|
||||
if isReady(old) != isReady(updated) {
|
||||
if IsReady(old) != IsReady(updated) {
|
||||
return true
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ type (
|
|||
|
||||
config Config
|
||||
|
||||
pods *watcher.PodWatcher
|
||||
workloads *watcher.WorkloadWatcher
|
||||
endpoints *watcher.EndpointsWatcher
|
||||
opaquePorts *watcher.OpaquePortsWatcher
|
||||
profiles *watcher.ProfileWatcher
|
||||
|
|
@ -86,7 +86,7 @@ func NewServer(
|
|||
return nil, err
|
||||
}
|
||||
|
||||
pods, err := watcher.NewPodWatcher(k8sAPI, metadataAPI, log, config.DefaultOpaquePorts)
|
||||
workloads, err := watcher.NewWorkloadWatcher(k8sAPI, metadataAPI, log, config.DefaultOpaquePorts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -106,7 +106,7 @@ func NewServer(
|
|||
srv := server{
|
||||
pb.UnimplementedDestinationServer{},
|
||||
config,
|
||||
pods,
|
||||
workloads,
|
||||
endpoints,
|
||||
opaquePorts,
|
||||
profiles,
|
||||
|
|
@ -507,11 +507,11 @@ func (s *server) subscribeToEndpointProfile(
|
|||
defer translator.Stop()
|
||||
|
||||
var err error
|
||||
ip, err = s.pods.Subscribe(service, hostname, ip, port, translator)
|
||||
ip, err = s.workloads.Subscribe(service, hostname, ip, port, translator)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer s.pods.Unsubscribe(ip, port, translator)
|
||||
defer s.workloads.Unsubscribe(ip, port, translator)
|
||||
|
||||
select {
|
||||
case <-s.shutdown:
|
||||
|
|
|
|||
|
|
@ -39,6 +39,8 @@ const podIPPolicy = "172.17.0.16"
|
|||
const podIPStatefulSet = "172.17.13.15"
|
||||
const externalIP = "192.168.1.20"
|
||||
const externalIPv6 = "2001:db8::78"
|
||||
const externalWorkloadIP = "200.1.1.1"
|
||||
const externalWorkloadIPPolicy = "200.1.1.2"
|
||||
const port uint32 = 8989
|
||||
const opaquePort uint32 = 4242
|
||||
const skippedPort uint32 = 24224
|
||||
|
|
@ -405,6 +407,47 @@ func TestGetProfiles(t *testing.T) {
|
|||
}
|
||||
})
|
||||
|
||||
t.Run("Return profile with endpoint when using externalworkload IP", func(t *testing.T) {
|
||||
server := makeServer(t)
|
||||
defer server.clusterStore.UnregisterGauges()
|
||||
|
||||
stream := profileStream(t, server, externalWorkloadIP, port, "ns:ns")
|
||||
defer stream.Cancel()
|
||||
|
||||
epAddr, err := toAddress(externalWorkloadIP, port)
|
||||
if err != nil {
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
|
||||
// An explanation for why we expect 1 to 3 updates is in test cases
|
||||
// above
|
||||
updates := stream.Updates()
|
||||
if len(updates) == 0 || len(updates) > 3 {
|
||||
t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(updates), updates)
|
||||
}
|
||||
|
||||
first := updates[0]
|
||||
if first.Endpoint == nil {
|
||||
t.Fatalf("Expected response to have endpoint field")
|
||||
}
|
||||
if first.OpaqueProtocol {
|
||||
t.Fatalf("Expected port %d to not be an opaque protocol, but it was", port)
|
||||
}
|
||||
_, exists := first.Endpoint.MetricLabels["namespace"]
|
||||
if !exists {
|
||||
t.Fatalf("Expected 'namespace' metric label to exist but it did not %v", first.Endpoint)
|
||||
}
|
||||
if first.GetEndpoint().GetProtocolHint() == nil {
|
||||
t.Fatalf("Expected protocol hint but found none")
|
||||
}
|
||||
if first.GetEndpoint().GetProtocolHint().GetOpaqueTransport() != nil {
|
||||
t.Fatalf("Expected externalworkload to not support opaque traffic on port %d", port)
|
||||
}
|
||||
if first.Endpoint.Addr.String() != epAddr.String() {
|
||||
t.Fatalf("Expected endpoint IP to be %s, but it was %s", epAddr.Ip, first.Endpoint.Addr.Ip)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Return default profile when IP does not map to service or pod", func(t *testing.T) {
|
||||
server := makeServer(t)
|
||||
defer server.clusterStore.UnregisterGauges()
|
||||
|
|
@ -558,6 +601,47 @@ func TestGetProfiles(t *testing.T) {
|
|||
}
|
||||
})
|
||||
|
||||
t.Run("Return opaque protocol profile with endpoint when using externalworkload IP and opaque protocol port", func(t *testing.T) {
|
||||
server := makeServer(t)
|
||||
defer server.clusterStore.UnregisterGauges()
|
||||
|
||||
stream := profileStream(t, server, externalWorkloadIP, opaquePort, "")
|
||||
defer stream.Cancel()
|
||||
|
||||
epAddr, err := toAddress(externalWorkloadIP, opaquePort)
|
||||
if err != nil {
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
|
||||
// An explanation for why we expect 1 to 3 updates is in test cases
|
||||
// above
|
||||
updates := stream.Updates()
|
||||
if len(updates) == 0 || len(updates) > 3 {
|
||||
t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(updates), updates)
|
||||
}
|
||||
|
||||
profile := assertSingleProfile(t, updates)
|
||||
if profile.Endpoint == nil {
|
||||
t.Fatalf("Expected response to have endpoint field")
|
||||
}
|
||||
if !profile.OpaqueProtocol {
|
||||
t.Fatalf("Expected port %d to be an opaque protocol, but it was not", opaquePort)
|
||||
}
|
||||
_, exists := profile.Endpoint.MetricLabels["namespace"]
|
||||
if !exists {
|
||||
t.Fatalf("Expected 'namespace' metric label to exist but it did not")
|
||||
}
|
||||
if profile.Endpoint.ProtocolHint == nil {
|
||||
t.Fatalf("Expected protocol hint but found none")
|
||||
}
|
||||
if profile.Endpoint.GetProtocolHint().GetOpaqueTransport().GetInboundPort() != 4143 {
|
||||
t.Fatalf("Expected pod to support opaque traffic on port 4143")
|
||||
}
|
||||
if profile.Endpoint.Addr.String() != epAddr.String() {
|
||||
t.Fatalf("Expected endpoint IP port to be %d, but it was %d", epAddr.Port, profile.Endpoint.Addr.Port)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Return profile with opaque protocol when using Pod IP selected by a Server", func(t *testing.T) {
|
||||
server := makeServer(t)
|
||||
defer server.clusterStore.UnregisterGauges()
|
||||
|
|
@ -579,6 +663,27 @@ func TestGetProfiles(t *testing.T) {
|
|||
}
|
||||
})
|
||||
|
||||
t.Run("Return profile with opaque protocol when using externalworkload IP selected by a Server", func(t *testing.T) {
|
||||
server := makeServer(t)
|
||||
defer server.clusterStore.UnregisterGauges()
|
||||
|
||||
stream := profileStream(t, server, externalWorkloadIPPolicy, 80, "")
|
||||
defer stream.Cancel()
|
||||
profile := assertSingleProfile(t, stream.Updates())
|
||||
if profile.Endpoint == nil {
|
||||
t.Fatalf("Expected response to have endpoint field")
|
||||
}
|
||||
if !profile.OpaqueProtocol {
|
||||
t.Fatalf("Expected port %d to be an opaque protocol, but it was not", 80)
|
||||
}
|
||||
if profile.Endpoint.GetProtocolHint() == nil {
|
||||
t.Fatalf("Expected protocol hint but found none")
|
||||
}
|
||||
if profile.Endpoint.GetProtocolHint().GetOpaqueTransport().GetInboundPort() != 4143 {
|
||||
t.Fatalf("Expected pod to support opaque traffic on port 4143")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Return profile with opaque protocol when using an opaque port with an external IP", func(t *testing.T) {
|
||||
server := makeServer(t)
|
||||
defer server.clusterStore.UnregisterGauges()
|
||||
|
|
|
|||
|
|
@ -337,6 +337,9 @@ spec:
|
|||
podSelector:
|
||||
matchLabels:
|
||||
app: policy-test
|
||||
externalWorkloadSelector:
|
||||
matchLabels:
|
||||
app: external-workload-policy-test
|
||||
port: 80
|
||||
proxyProtocol: opaque`,
|
||||
}
|
||||
|
|
@ -445,6 +448,51 @@ spec:
|
|||
- port: 80`,
|
||||
}
|
||||
|
||||
externalWorkloads := []string{`
|
||||
apiVersion: workload.linkerd.io/v1alpha1
|
||||
kind: ExternalWorkload
|
||||
metadata:
|
||||
name: my-cool-workload
|
||||
namespace: ns
|
||||
annotations:
|
||||
config.linkerd.io/opaque-ports: "4242"
|
||||
spec:
|
||||
meshTls:
|
||||
identity: spiffe://some-domain/cool
|
||||
serverName: server.local
|
||||
workloadIPs:
|
||||
- ip: 200.1.1.1
|
||||
ports:
|
||||
- port: 8989
|
||||
- port: 4242
|
||||
- name: linkerd-proxy
|
||||
port: 4143
|
||||
status:
|
||||
conditions:
|
||||
ready: true`,
|
||||
`
|
||||
apiVersion: workload.linkerd.io/v1alpha1
|
||||
kind: ExternalWorkload
|
||||
metadata:
|
||||
name: policy-test-workload
|
||||
namespace: ns
|
||||
labels:
|
||||
app: external-workload-policy-test
|
||||
spec:
|
||||
meshTls:
|
||||
identity: spiffe://some-domain/cool
|
||||
serverName: server.local
|
||||
workloadIPs:
|
||||
- ip: 200.1.1.2
|
||||
ports:
|
||||
- port: 80
|
||||
- name: linkerd-proxy
|
||||
port: 4143
|
||||
status:
|
||||
conditions:
|
||||
ready: true`,
|
||||
}
|
||||
|
||||
res := append(meshedPodResources, clientSP...)
|
||||
res = append(res, unmeshedPod)
|
||||
res = append(res, meshedOpaquePodResources...)
|
||||
|
|
@ -455,6 +503,7 @@ spec:
|
|||
res = append(res, hostPortMapping...)
|
||||
res = append(res, mirrorServiceResources...)
|
||||
res = append(res, destinationCredentialsResources...)
|
||||
res = append(res, externalWorkloads...)
|
||||
k8sAPI, l5dClient, err := k8s.NewFakeAPIWithL5dClient(res...)
|
||||
if err != nil {
|
||||
t.Fatalf("NewFakeAPIWithL5dClient returned an error: %s", err)
|
||||
|
|
@ -479,9 +528,9 @@ spec:
|
|||
t.Fatalf("initializeIndexers returned an error: %s", err)
|
||||
}
|
||||
|
||||
pods, err := watcher.NewPodWatcher(k8sAPI, metadataAPI, log, defaultOpaquePorts)
|
||||
workloads, err := watcher.NewWorkloadWatcher(k8sAPI, metadataAPI, log, defaultOpaquePorts)
|
||||
if err != nil {
|
||||
t.Fatalf("can't create Pods watcher: %s", err)
|
||||
t.Fatalf("can't create Workloads watcher: %s", err)
|
||||
}
|
||||
endpoints, err := watcher.NewEndpointsWatcher(k8sAPI, metadataAPI, log, false, "local")
|
||||
if err != nil {
|
||||
|
|
@ -516,7 +565,7 @@ spec:
|
|||
IdentityTrustDomain: "trust.domain",
|
||||
DefaultOpaquePorts: defaultOpaquePorts,
|
||||
},
|
||||
pods,
|
||||
workloads,
|
||||
endpoints,
|
||||
opaquePorts,
|
||||
profiles,
|
||||
|
|
|
|||
|
|
@ -916,7 +916,7 @@ func (pp *portPublisher) endpointSliceToAddresses(es *discovery.EndpointSlice) A
|
|||
continue
|
||||
}
|
||||
|
||||
err = setToServerProtocolExternalWorkload(pp.k8sAPI, &address, resolvedPort)
|
||||
err = SetToServerProtocolExternalWorkload(pp.k8sAPI, &address, resolvedPort)
|
||||
if err != nil {
|
||||
pp.log.Errorf("failed to set address OpaqueProtocol: %s", err)
|
||||
continue
|
||||
|
|
@ -1499,7 +1499,7 @@ func SetToServerProtocol(k8sAPI *k8s.API, address *Address, port Port) error {
|
|||
|
||||
// setToServerProtocolExternalWorkload sets the address's OpaqueProtocol field based off any
|
||||
// Servers that select it and override the expected protocol for ExternalWorkloads.
|
||||
func setToServerProtocolExternalWorkload(k8sAPI *k8s.API, address *Address, port Port) error {
|
||||
func SetToServerProtocolExternalWorkload(k8sAPI *k8s.API, address *Address, port Port) error {
|
||||
if address.ExternalWorkload == nil {
|
||||
return fmt.Errorf("endpoint not backed by ExternalWorkload: %s:%d", address.IP, address.Port)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"net"
|
||||
|
||||
ext "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1alpha1"
|
||||
"github.com/linkerd/linkerd2/controller/k8s"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
|
|
@ -17,6 +18,8 @@ const (
|
|||
PodIPIndex = "ip"
|
||||
// HostIPIndex is the key for the index based on Host IP of pods with host network enabled
|
||||
HostIPIndex = "hostIP"
|
||||
// ExternalWorkloadIPIndex is the key for the index based on IP of externalworkloads
|
||||
ExternalWorkloadIPIndex = "externalWorkloadIP"
|
||||
)
|
||||
|
||||
type (
|
||||
|
|
@ -156,6 +159,25 @@ func InitializeIndexers(k8sAPI *k8s.API) error {
|
|||
return fmt.Errorf("could not create an indexer for pods: %w", err)
|
||||
}
|
||||
|
||||
err = k8sAPI.ExtWorkload().Informer().AddIndexers(cache.Indexers{ExternalWorkloadIPIndex: func(obj interface{}) ([]string, error) {
|
||||
ew, ok := obj.(*ext.ExternalWorkload)
|
||||
if !ok {
|
||||
return nil, errors.New("object is not an externalworkload")
|
||||
}
|
||||
|
||||
addrs := []string{}
|
||||
for _, ip := range ew.Spec.WorkloadIPs {
|
||||
for _, port := range ew.Spec.Ports {
|
||||
addrs = append(addrs, net.JoinHostPort(ip.Ip, fmt.Sprintf("%d", port.Port)))
|
||||
}
|
||||
}
|
||||
return addrs, nil
|
||||
}})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create an indexer for externalworkloads: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -175,6 +197,19 @@ func getIndexedPods(k8sAPI *k8s.API, indexName string, key string) ([]*corev1.Po
|
|||
return pods, nil
|
||||
}
|
||||
|
||||
func getIndexedExternalWorkloads(k8sAPI *k8s.API, indexName string, key string) ([]*ext.ExternalWorkload, error) {
|
||||
objs, err := k8sAPI.ExtWorkload().Informer().GetIndexer().ByIndex(indexName, key)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed getting %s indexed externalworkloads: %w", indexName, err)
|
||||
}
|
||||
workloads := make([]*ext.ExternalWorkload, 0)
|
||||
for _, obj := range objs {
|
||||
workload := obj.(*ext.ExternalWorkload)
|
||||
workloads = append(workloads, workload)
|
||||
}
|
||||
return workloads, nil
|
||||
}
|
||||
|
||||
func podNotTerminating(pod *corev1.Pod) bool {
|
||||
phase := pod.Status.Phase
|
||||
podTerminated := phase == corev1.PodSucceeded || phase == corev1.PodFailed
|
||||
|
|
|
|||
|
|
@ -1,583 +0,0 @@
|
|||
package watcher
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/linkerd/linkerd2/controller/gen/apis/server/v1beta2"
|
||||
"github.com/linkerd/linkerd2/controller/k8s"
|
||||
consts "github.com/linkerd/linkerd2/pkg/k8s"
|
||||
"github.com/linkerd/linkerd2/pkg/util"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
logging "github.com/sirupsen/logrus"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
type (
|
||||
// PodWatcher watches all pods in the cluster. It keeps a map of publishers
|
||||
// keyed by IP and port.
|
||||
PodWatcher struct {
|
||||
defaultOpaquePorts map[uint32]struct{}
|
||||
k8sAPI *k8s.API
|
||||
metadataAPI *k8s.MetadataAPI
|
||||
publishers map[IPPort]*podPublisher
|
||||
log *logging.Entry
|
||||
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
// podPublisher represents an ip:port along with the backing pod (if any).
|
||||
// It keeps a list of listeners to be notified whenever the pod or the
|
||||
// associated opaque protocol config changes.
|
||||
podPublisher struct {
|
||||
defaultOpaquePorts map[uint32]struct{}
|
||||
k8sAPI *k8s.API
|
||||
metadataAPI *k8s.MetadataAPI
|
||||
ip string
|
||||
port Port
|
||||
pod *corev1.Pod
|
||||
listeners []PodUpdateListener
|
||||
metrics metrics
|
||||
log *logging.Entry
|
||||
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
// PodUpdateListener is the interface subscribers must implement.
|
||||
PodUpdateListener interface {
|
||||
Update(*Address) error
|
||||
}
|
||||
)
|
||||
|
||||
var ipPortVecs = newMetricsVecs("ip_port", []string{"ip", "port"})
|
||||
|
||||
func NewPodWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *logging.Entry, defaultOpaquePorts map[uint32]struct{}) (*PodWatcher, error) {
|
||||
pw := &PodWatcher{
|
||||
defaultOpaquePorts: defaultOpaquePorts,
|
||||
k8sAPI: k8sAPI,
|
||||
metadataAPI: metadataAPI,
|
||||
publishers: make(map[IPPort]*podPublisher),
|
||||
log: log.WithFields(logging.Fields{
|
||||
"component": "pod-watcher",
|
||||
}),
|
||||
}
|
||||
|
||||
_, err := k8sAPI.Pod().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: pw.addPod,
|
||||
DeleteFunc: pw.deletePod,
|
||||
UpdateFunc: pw.updatePod,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, err = k8sAPI.Srv().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: pw.updateServers,
|
||||
DeleteFunc: pw.updateServers,
|
||||
UpdateFunc: pw.updateServer,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return pw, nil
|
||||
}
|
||||
|
||||
// Subscribe notifies the listener on changes on any pod backing the passed
|
||||
// host/ip:port or changes to its associated opaque protocol config. If service
|
||||
// and hostname are empty then ip should be set and vice-versa. If ip is empty,
|
||||
// the corresponding ip is found for the given service/hostname, and returned.
|
||||
func (pw *PodWatcher) Subscribe(service *ServiceID, hostname, ip string, port Port, listener PodUpdateListener) (string, error) {
|
||||
if hostname != "" {
|
||||
pw.log.Debugf("Establishing watch on pod %s.%s.%s:%d", hostname, service.Name, service.Namespace, port)
|
||||
} else if service != nil {
|
||||
pw.log.Debugf("Establishing watch on pod %s.%s:%d", service.Name, service.Namespace, port)
|
||||
} else {
|
||||
pw.log.Debugf("Establishing watch on pod %s:%d", ip, port)
|
||||
}
|
||||
pp, err := pw.getOrNewPodPublisher(service, hostname, ip, port)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
pp.subscribe(listener)
|
||||
|
||||
address, err := pp.createAddress()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if err = listener.Update(&address); err != nil {
|
||||
return "", fmt.Errorf("failed to send initial update: %w", err)
|
||||
}
|
||||
pp.metrics.incUpdates()
|
||||
|
||||
return pp.ip, nil
|
||||
}
|
||||
|
||||
// Subscribe stops notifying the listener on chages on any pod backing the
|
||||
// passed ip:port or its associated protocol config
|
||||
func (pw *PodWatcher) Unsubscribe(ip string, port Port, listener PodUpdateListener) {
|
||||
pw.mu.Lock()
|
||||
defer pw.mu.Unlock()
|
||||
|
||||
pw.log.Debugf("Stopping watch on %s:%d", ip, port)
|
||||
pp, ok := pw.getPodPublisher(ip, port)
|
||||
if !ok {
|
||||
pw.log.Errorf("Cannot unsubscribe from unknown ip:port [%s:%d]", ip, port)
|
||||
return
|
||||
}
|
||||
pp.unsubscribe(listener)
|
||||
|
||||
if len(pp.listeners) == 0 {
|
||||
delete(pw.publishers, IPPort{pp.ip, pp.port})
|
||||
}
|
||||
}
|
||||
|
||||
// addPod is an event handler so it cannot block
|
||||
func (pw *PodWatcher) addPod(obj any) {
|
||||
pod := obj.(*corev1.Pod)
|
||||
pw.log.Tracef("Added pod %s.%s", pod.Name, pod.Namespace)
|
||||
go pw.submitPodUpdate(pod, false)
|
||||
}
|
||||
|
||||
// deletePod is an event handler so it cannot block
|
||||
func (pw *PodWatcher) deletePod(obj any) {
|
||||
pod, ok := obj.(*corev1.Pod)
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
pw.log.Errorf("Couldn't get object from DeletedFinalStateUnknown %#v", obj)
|
||||
return
|
||||
}
|
||||
pod, ok = tombstone.Obj.(*corev1.Pod)
|
||||
if !ok {
|
||||
pw.log.Errorf("DeletedFinalStateUnknown contained object that is not a Pod %#v", obj)
|
||||
return
|
||||
}
|
||||
}
|
||||
pw.log.Tracef("Deleted pod %s.%s", pod.Name, pod.Namespace)
|
||||
go pw.submitPodUpdate(pod, true)
|
||||
}
|
||||
|
||||
// updatePod is an event handler so it cannot block
|
||||
func (pw *PodWatcher) updatePod(oldObj any, newObj any) {
|
||||
oldPod := oldObj.(*corev1.Pod)
|
||||
newPod := newObj.(*corev1.Pod)
|
||||
if oldPod.DeletionTimestamp == nil && newPod.DeletionTimestamp != nil {
|
||||
// this is just a mark, wait for actual deletion event
|
||||
return
|
||||
}
|
||||
|
||||
oldUpdated := latestUpdated(oldPod.ManagedFields)
|
||||
updated := latestUpdated(newPod.ManagedFields)
|
||||
if !updated.IsZero() && updated != oldUpdated {
|
||||
delta := time.Since(updated)
|
||||
podInformerLag.Observe(delta.Seconds())
|
||||
}
|
||||
|
||||
pw.log.Tracef("Updated pod %s.%s", newPod.Name, newPod.Namespace)
|
||||
go pw.submitPodUpdate(newPod, false)
|
||||
}
|
||||
|
||||
func (pw *PodWatcher) submitPodUpdate(pod *corev1.Pod, remove bool) {
|
||||
pw.mu.RLock()
|
||||
defer pw.mu.RUnlock()
|
||||
|
||||
submitPod := pod
|
||||
if remove {
|
||||
submitPod = nil
|
||||
}
|
||||
|
||||
for _, container := range pod.Spec.Containers {
|
||||
for _, containerPort := range container.Ports {
|
||||
if containerPort.ContainerPort != 0 {
|
||||
for _, pip := range pod.Status.PodIPs {
|
||||
if pp, ok := pw.getPodPublisher(pip.IP, Port(containerPort.ContainerPort)); ok {
|
||||
pp.updatePod(submitPod)
|
||||
}
|
||||
}
|
||||
if len(pod.Status.PodIPs) == 0 && pod.Status.PodIP != "" {
|
||||
if pp, ok := pw.getPodPublisher(pod.Status.PodIP, Port(containerPort.ContainerPort)); ok {
|
||||
pp.updatePod(submitPod)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if containerPort.HostPort != 0 {
|
||||
for _, hip := range pod.Status.HostIPs {
|
||||
if pp, ok := pw.getPodPublisher(hip.IP, Port(containerPort.HostPort)); ok {
|
||||
pp.updatePod(submitPod)
|
||||
}
|
||||
}
|
||||
if len(pod.Status.HostIPs) == 0 && pod.Status.HostIP != "" {
|
||||
if pp, ok := pw.getPodPublisher(pod.Status.HostIP, Port(containerPort.HostPort)); ok {
|
||||
pp.updatePod(submitPod)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (pw *PodWatcher) updateServer(oldObj interface{}, newObj interface{}) {
|
||||
oldServer := oldObj.(*v1beta2.Server)
|
||||
newServer := newObj.(*v1beta2.Server)
|
||||
|
||||
oldUpdated := latestUpdated(oldServer.ManagedFields)
|
||||
updated := latestUpdated(newServer.ManagedFields)
|
||||
if !updated.IsZero() && updated != oldUpdated {
|
||||
delta := time.Since(updated)
|
||||
serverInformerLag.Observe(delta.Seconds())
|
||||
}
|
||||
|
||||
pw.updateServers(newObj)
|
||||
}
|
||||
|
||||
// updateServer triggers an Update() call to the listeners of the podPublishers
|
||||
// whose pod matches the Server's selector. This function is an event handler
|
||||
// so it cannot block.
|
||||
func (pw *PodWatcher) updateServers(_ any) {
|
||||
pw.mu.RLock()
|
||||
defer pw.mu.RUnlock()
|
||||
|
||||
for _, pp := range pw.publishers {
|
||||
if pp.pod == nil {
|
||||
continue
|
||||
}
|
||||
opaquePorts := GetAnnotatedOpaquePorts(pp.pod, pw.defaultOpaquePorts)
|
||||
_, isOpaque := opaquePorts[pp.port]
|
||||
// if port is annotated to be always opaque we can disregard Server updates
|
||||
if isOpaque {
|
||||
continue
|
||||
}
|
||||
|
||||
go func(pp *podPublisher) {
|
||||
pp.mu.RLock()
|
||||
defer pp.mu.RUnlock()
|
||||
|
||||
updated := false
|
||||
for _, listener := range pp.listeners {
|
||||
// the Server in question doesn't carry information about other
|
||||
// Servers that might target this podPublisher; createAddress()
|
||||
// queries all the relevant Servers to determine the full state
|
||||
addr, err := pp.createAddress()
|
||||
if err != nil {
|
||||
pw.log.Errorf("Error creating address for pod: %s", err)
|
||||
continue
|
||||
}
|
||||
if err = listener.Update(&addr); err != nil {
|
||||
pw.log.Warnf("Error sending update to listener: %s", err)
|
||||
continue
|
||||
}
|
||||
updated = true
|
||||
}
|
||||
if updated {
|
||||
pp.metrics.incUpdates()
|
||||
}
|
||||
}(pp)
|
||||
}
|
||||
}
|
||||
|
||||
// getOrNewPodPublisher returns the podPublisher for the given target if it
|
||||
// exists. Otherwise, it creates a new one and returns it.
|
||||
func (pw *PodWatcher) getOrNewPodPublisher(service *ServiceID, hostname, ip string, port Port) (*podPublisher, error) {
|
||||
pw.mu.Lock()
|
||||
defer pw.mu.Unlock()
|
||||
|
||||
var pod *corev1.Pod
|
||||
var err error
|
||||
if hostname != "" {
|
||||
pod, err = pw.getEndpointByHostname(hostname, service)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ip = pod.Status.PodIP
|
||||
} else {
|
||||
pod, err = pw.getPodByPodIP(ip, port)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if pod == nil {
|
||||
pod, err = pw.getPodByHostIP(ip, port)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ipPort := IPPort{ip, port}
|
||||
pp, ok := pw.publishers[ipPort]
|
||||
if !ok {
|
||||
pp = &podPublisher{
|
||||
defaultOpaquePorts: pw.defaultOpaquePorts,
|
||||
k8sAPI: pw.k8sAPI,
|
||||
metadataAPI: pw.metadataAPI,
|
||||
ip: ip,
|
||||
port: port,
|
||||
pod: pod,
|
||||
metrics: ipPortVecs.newMetrics(prometheus.Labels{
|
||||
"ip": ip,
|
||||
"port": strconv.FormatUint(uint64(port), 10),
|
||||
}),
|
||||
log: pw.log.WithFields(logging.Fields{
|
||||
"component": "pod-publisher",
|
||||
"ip": ip,
|
||||
"port": port,
|
||||
}),
|
||||
}
|
||||
pw.publishers[ipPort] = pp
|
||||
}
|
||||
return pp, nil
|
||||
}
|
||||
|
||||
func (pw *PodWatcher) getPodPublisher(ip string, port Port) (pp *podPublisher, ok bool) {
|
||||
ipPort := IPPort{ip, port}
|
||||
pp, ok = pw.publishers[ipPort]
|
||||
return
|
||||
}
|
||||
|
||||
// getPodByPodIP returns a pod that maps to the given IP address in the pod network
|
||||
func (pw *PodWatcher) getPodByPodIP(podIP string, port uint32) (*corev1.Pod, error) {
|
||||
podIPPods, err := getIndexedPods(pw.k8sAPI, PodIPIndex, podIP)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Unknown, err.Error())
|
||||
}
|
||||
if len(podIPPods) == 1 {
|
||||
pw.log.Debugf("found %s on the pod network", podIP)
|
||||
return podIPPods[0], nil
|
||||
}
|
||||
if len(podIPPods) > 1 {
|
||||
conflictingPods := []string{}
|
||||
for _, pod := range podIPPods {
|
||||
conflictingPods = append(conflictingPods, fmt.Sprintf("%s:%s", pod.Namespace, pod.Name))
|
||||
}
|
||||
pw.log.Warnf("found conflicting %s IP on the pod network: %s", podIP, strings.Join(conflictingPods, ","))
|
||||
return nil, status.Errorf(codes.FailedPrecondition, "found %d pods with a conflicting pod network IP %s", len(podIPPods), podIP)
|
||||
}
|
||||
|
||||
pw.log.Debugf("no pod found for %s:%d", podIP, port)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// getPodByHostIP returns a pod that maps to the given IP address in the host
|
||||
// network. It must have a container port that exposes `port` as a host port.
|
||||
func (pw *PodWatcher) getPodByHostIP(hostIP string, port uint32) (*corev1.Pod, error) {
|
||||
addr := net.JoinHostPort(hostIP, fmt.Sprintf("%d", port))
|
||||
hostIPPods, err := getIndexedPods(pw.k8sAPI, HostIPIndex, addr)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Unknown, err.Error())
|
||||
}
|
||||
if len(hostIPPods) == 1 {
|
||||
pw.log.Debugf("found %s:%d on the host network", hostIP, port)
|
||||
return hostIPPods[0], nil
|
||||
}
|
||||
if len(hostIPPods) > 1 {
|
||||
conflictingPods := []string{}
|
||||
for _, pod := range hostIPPods {
|
||||
conflictingPods = append(conflictingPods, fmt.Sprintf("%s:%s", pod.Namespace, pod.Name))
|
||||
}
|
||||
pw.log.Warnf("found conflicting %s:%d endpoint on the host network: %s", hostIP, port, strings.Join(conflictingPods, ","))
|
||||
return nil, status.Errorf(codes.FailedPrecondition, "found %d pods with a conflicting host network endpoint %s:%d", len(hostIPPods), hostIP, port)
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// getEndpointByHostname returns a pod that maps to the given hostname (or an
|
||||
// instanceID). The hostname is generally the prefix of the pod's DNS name;
|
||||
// since it may be arbitrary we need to look at the corresponding service's
|
||||
// Endpoints object to see whether the hostname matches a pod.
|
||||
func (pw *PodWatcher) getEndpointByHostname(hostname string, svcID *ServiceID) (*corev1.Pod, error) {
|
||||
ep, err := pw.k8sAPI.Endpoint().Lister().Endpoints(svcID.Namespace).Get(svcID.Name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, subset := range ep.Subsets {
|
||||
for _, addr := range subset.Addresses {
|
||||
|
||||
if hostname == addr.Hostname {
|
||||
if addr.TargetRef != nil && addr.TargetRef.Kind == "Pod" {
|
||||
podName := addr.TargetRef.Name
|
||||
podNamespace := addr.TargetRef.Namespace
|
||||
pod, err := pw.k8sAPI.Pod().Lister().Pods(podNamespace).Get(podName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return pod, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil, status.Errorf(codes.NotFound, "no pod found in Endpoints %s/%s for hostname %s", svcID.Namespace, svcID.Name, hostname)
|
||||
}
|
||||
|
||||
func (pp *podPublisher) subscribe(listener PodUpdateListener) {
|
||||
pp.mu.Lock()
|
||||
defer pp.mu.Unlock()
|
||||
|
||||
pp.listeners = append(pp.listeners, listener)
|
||||
pp.metrics.setSubscribers(len(pp.listeners))
|
||||
}
|
||||
|
||||
func (pp *podPublisher) unsubscribe(listener PodUpdateListener) {
|
||||
pp.mu.Lock()
|
||||
defer pp.mu.Unlock()
|
||||
|
||||
for i, e := range pp.listeners {
|
||||
if e == listener {
|
||||
n := len(pp.listeners)
|
||||
pp.listeners[i] = pp.listeners[n-1]
|
||||
pp.listeners[n-1] = nil
|
||||
pp.listeners = pp.listeners[:n-1]
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
pp.metrics.setSubscribers(len(pp.listeners))
|
||||
}
|
||||
|
||||
// updatePod creates an Address instance for the given pod, that is passed to
|
||||
// the listener's Update() method, only if the pod's readiness state has
|
||||
// changed. If the passed pod is nil, it means the pod (still referred to in
|
||||
// pp.pod) has been deleted.
|
||||
func (pp *podPublisher) updatePod(pod *corev1.Pod) {
|
||||
pp.mu.Lock()
|
||||
defer pp.mu.Unlock()
|
||||
|
||||
// pod wasn't ready or there was no backing pod - check if passed pod is ready
|
||||
if pp.pod == nil {
|
||||
if pod == nil {
|
||||
pp.log.Trace("Pod deletion event already consumed - ignore")
|
||||
return
|
||||
}
|
||||
|
||||
if !isRunningAndReady(pod) {
|
||||
pp.log.Tracef("Pod %s.%s not ready - ignore", pod.Name, pod.Namespace)
|
||||
return
|
||||
}
|
||||
|
||||
pp.log.Debugf("Pod %s.%s became ready", pod.Name, pod.Namespace)
|
||||
pp.pod = pod
|
||||
updated := false
|
||||
for _, l := range pp.listeners {
|
||||
addr, err := pp.createAddress()
|
||||
if err != nil {
|
||||
pp.log.Errorf("Error creating address for pod: %s", err)
|
||||
continue
|
||||
}
|
||||
if err = l.Update(&addr); err != nil {
|
||||
pp.log.Warnf("Error sending update to listener: %s", err)
|
||||
continue
|
||||
}
|
||||
updated = true
|
||||
}
|
||||
if updated {
|
||||
pp.metrics.incUpdates()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// backing pod becoming unready or getting deleted
|
||||
if pod == nil || !isRunningAndReady(pod) {
|
||||
pp.log.Debugf("Pod %s.%s deleted or it became unready - remove", pp.pod.Name, pp.pod.Namespace)
|
||||
pp.pod = nil
|
||||
updated := false
|
||||
for _, l := range pp.listeners {
|
||||
addr, err := pp.createAddress()
|
||||
if err != nil {
|
||||
pp.log.Errorf("Error creating address for pod: %s", err)
|
||||
continue
|
||||
}
|
||||
if err = l.Update(&addr); err != nil {
|
||||
pp.log.Warnf("Error sending update to listener: %s", err)
|
||||
continue
|
||||
}
|
||||
updated = true
|
||||
}
|
||||
if updated {
|
||||
pp.metrics.incUpdates()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
pp.log.Tracef("Ignored event on pod %s.%s", pod.Name, pod.Namespace)
|
||||
}
|
||||
|
||||
// createAddress returns an Address instance for the given ip, port and pod. It
|
||||
// completes the ownership and opaque protocol information
|
||||
func (pp *podPublisher) createAddress() (Address, error) {
|
||||
var ownerKind, ownerName string
|
||||
var err error
|
||||
if pp.pod != nil {
|
||||
ownerKind, ownerName, err = pp.metadataAPI.GetOwnerKindAndName(context.Background(), pp.pod, true)
|
||||
if err != nil {
|
||||
return Address{}, err
|
||||
}
|
||||
}
|
||||
|
||||
address := Address{
|
||||
IP: pp.ip,
|
||||
Port: pp.port,
|
||||
Pod: pp.pod,
|
||||
OwnerName: ownerName,
|
||||
OwnerKind: ownerKind,
|
||||
}
|
||||
|
||||
// Override opaqueProtocol if the endpoint's port is annotated as opaque
|
||||
opaquePorts := GetAnnotatedOpaquePorts(pp.pod, pp.defaultOpaquePorts)
|
||||
if _, ok := opaquePorts[pp.port]; ok {
|
||||
address.OpaqueProtocol = true
|
||||
} else if pp.pod != nil {
|
||||
if err := SetToServerProtocol(pp.k8sAPI, &address, pp.port); err != nil {
|
||||
return Address{}, fmt.Errorf("failed to set address OpaqueProtocol: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return address, nil
|
||||
}
|
||||
|
||||
// GetAnnotatedOpaquePorts returns the opaque ports for the pod given its
|
||||
// annotations, or the default opaque ports if it's not annotated
|
||||
func GetAnnotatedOpaquePorts(pod *corev1.Pod, defaultPorts map[uint32]struct{}) map[uint32]struct{} {
|
||||
if pod == nil {
|
||||
return defaultPorts
|
||||
}
|
||||
annotation, ok := pod.Annotations[consts.ProxyOpaquePortsAnnotation]
|
||||
if !ok {
|
||||
return defaultPorts
|
||||
}
|
||||
opaquePorts := make(map[uint32]struct{})
|
||||
if annotation != "" {
|
||||
for _, pr := range util.ParseContainerOpaquePorts(annotation, pod.Spec.Containers) {
|
||||
for _, port := range pr.Ports() {
|
||||
opaquePorts[uint32(port)] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
return opaquePorts
|
||||
}
|
||||
|
||||
func isRunningAndReady(pod *corev1.Pod) bool {
|
||||
if pod == nil || pod.Status.Phase != corev1.PodRunning {
|
||||
return false
|
||||
}
|
||||
for _, condition := range pod.Status.Conditions {
|
||||
if condition.Type == corev1.PodReady && condition.Status == corev1.ConditionTrue {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
|
@ -87,6 +87,14 @@ var (
|
|||
},
|
||||
)
|
||||
|
||||
externalWorkloadInformerLag = promauto.NewHistogram(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "externalworkload_informer_lag_seconds",
|
||||
Help: "The amount of time between when an ExternalWorkload resource is updated and when an informer observes it",
|
||||
Buckets: informer_lag_seconds_buckets,
|
||||
},
|
||||
)
|
||||
|
||||
serviceProfileInformerLag = promauto.NewHistogram(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "serviceprofiles_informer_lag_seconds",
|
||||
|
|
|
|||
|
|
@ -0,0 +1,840 @@
|
|||
package watcher
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
externalworkload "github.com/linkerd/linkerd2/controller/api/destination/external-workload"
|
||||
ext "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1alpha1"
|
||||
"github.com/linkerd/linkerd2/controller/gen/apis/server/v1beta2"
|
||||
"github.com/linkerd/linkerd2/controller/k8s"
|
||||
consts "github.com/linkerd/linkerd2/pkg/k8s"
|
||||
"github.com/linkerd/linkerd2/pkg/util"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
logging "github.com/sirupsen/logrus"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
type (
|
||||
// WorkloadWatcher watches all pods and externalworkloads in the cluster.
|
||||
// It keeps a map of publishers keyed by IP and port.
|
||||
WorkloadWatcher struct {
|
||||
defaultOpaquePorts map[uint32]struct{}
|
||||
k8sAPI *k8s.API
|
||||
metadataAPI *k8s.MetadataAPI
|
||||
publishers map[IPPort]*workloadPublisher
|
||||
log *logging.Entry
|
||||
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
// workloadPublisher represents an ip:port along with the backing pod
|
||||
// or externalworkload (if any). It keeps a list of listeners to be notified
|
||||
// whenever the workload or the associated opaque protocol config changes.
|
||||
workloadPublisher struct {
|
||||
defaultOpaquePorts map[uint32]struct{}
|
||||
k8sAPI *k8s.API
|
||||
metadataAPI *k8s.MetadataAPI
|
||||
ip string
|
||||
port Port
|
||||
pod *corev1.Pod
|
||||
externalWorkload *ext.ExternalWorkload
|
||||
listeners []WorkloadUpdateListener
|
||||
metrics metrics
|
||||
log *logging.Entry
|
||||
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
// PodUpdateListener is the interface subscribers must implement.
|
||||
WorkloadUpdateListener interface {
|
||||
Update(*Address) error
|
||||
}
|
||||
)
|
||||
|
||||
var ipPortVecs = newMetricsVecs("ip_port", []string{"ip", "port"})
|
||||
|
||||
func NewWorkloadWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *logging.Entry, defaultOpaquePorts map[uint32]struct{}) (*WorkloadWatcher, error) {
|
||||
ww := &WorkloadWatcher{
|
||||
defaultOpaquePorts: defaultOpaquePorts,
|
||||
k8sAPI: k8sAPI,
|
||||
metadataAPI: metadataAPI,
|
||||
publishers: make(map[IPPort]*workloadPublisher),
|
||||
log: log.WithFields(logging.Fields{
|
||||
"component": "workload-watcher",
|
||||
}),
|
||||
}
|
||||
|
||||
_, err := k8sAPI.Pod().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: ww.addPod,
|
||||
DeleteFunc: ww.deletePod,
|
||||
UpdateFunc: ww.updatePod,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, err = k8sAPI.ExtWorkload().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: ww.addExternalWorkload,
|
||||
DeleteFunc: ww.deleteExternalWorkload,
|
||||
UpdateFunc: ww.updateExternalWorkload,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, err = k8sAPI.Srv().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: ww.updateServers,
|
||||
DeleteFunc: ww.updateServers,
|
||||
UpdateFunc: ww.updateServer,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return ww, nil
|
||||
}
|
||||
|
||||
// Subscribe notifies the listener on changes on any workload backing the passed
|
||||
// host/ip:port or changes to its associated opaque protocol config. If service
|
||||
// and hostname are empty then ip should be set and vice-versa. If ip is empty,
|
||||
// the corresponding ip is found for the given service/hostname, and returned.
|
||||
func (ww *WorkloadWatcher) Subscribe(service *ServiceID, hostname, ip string, port Port, listener WorkloadUpdateListener) (string, error) {
|
||||
if hostname != "" {
|
||||
ww.log.Debugf("Establishing watch on workload %s.%s.%s:%d", hostname, service.Name, service.Namespace, port)
|
||||
} else if service != nil {
|
||||
ww.log.Debugf("Establishing watch on workload %s.%s:%d", service.Name, service.Namespace, port)
|
||||
} else {
|
||||
ww.log.Debugf("Establishing watch on workload %s:%d", ip, port)
|
||||
}
|
||||
pp, err := ww.getOrNewWorkloadPublisher(service, hostname, ip, port)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
pp.subscribe(listener)
|
||||
|
||||
address, err := pp.createAddress()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if err = listener.Update(&address); err != nil {
|
||||
return "", fmt.Errorf("failed to send initial update: %w", err)
|
||||
}
|
||||
pp.metrics.incUpdates()
|
||||
|
||||
return pp.ip, nil
|
||||
}
|
||||
|
||||
// Subscribe stops notifying the listener on chages on any pod backing the
|
||||
// passed ip:port or its associated protocol config
|
||||
func (ww *WorkloadWatcher) Unsubscribe(ip string, port Port, listener WorkloadUpdateListener) {
|
||||
ww.mu.Lock()
|
||||
defer ww.mu.Unlock()
|
||||
|
||||
ww.log.Debugf("Stopping watch on %s:%d", ip, port)
|
||||
wp, ok := ww.getWorkloadPublisher(ip, port)
|
||||
if !ok {
|
||||
ww.log.Errorf("Cannot unsubscribe from unknown ip:port [%s:%d]", ip, port)
|
||||
return
|
||||
}
|
||||
wp.unsubscribe(listener)
|
||||
|
||||
if len(wp.listeners) == 0 {
|
||||
delete(ww.publishers, IPPort{wp.ip, wp.port})
|
||||
}
|
||||
}
|
||||
|
||||
// addPod is an event handler so it cannot block
|
||||
func (ww *WorkloadWatcher) addPod(obj any) {
|
||||
pod := obj.(*corev1.Pod)
|
||||
ww.log.Tracef("Added pod %s.%s", pod.Name, pod.Namespace)
|
||||
go ww.submitPodUpdate(pod, false)
|
||||
}
|
||||
|
||||
// deletePod is an event handler so it cannot block
|
||||
func (ww *WorkloadWatcher) deletePod(obj any) {
|
||||
pod, ok := obj.(*corev1.Pod)
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
ww.log.Errorf("Couldn't get object from DeletedFinalStateUnknown %#v", obj)
|
||||
return
|
||||
}
|
||||
pod, ok = tombstone.Obj.(*corev1.Pod)
|
||||
if !ok {
|
||||
ww.log.Errorf("DeletedFinalStateUnknown contained object that is not a Pod %#v", obj)
|
||||
return
|
||||
}
|
||||
}
|
||||
ww.log.Tracef("Deleted pod %s.%s", pod.Name, pod.Namespace)
|
||||
go ww.submitPodUpdate(pod, true)
|
||||
}
|
||||
|
||||
// updatePod is an event handler so it cannot block
|
||||
func (ww *WorkloadWatcher) updatePod(oldObj any, newObj any) {
|
||||
oldPod := oldObj.(*corev1.Pod)
|
||||
newPod := newObj.(*corev1.Pod)
|
||||
if oldPod.DeletionTimestamp == nil && newPod.DeletionTimestamp != nil {
|
||||
// this is just a mark, wait for actual deletion event
|
||||
return
|
||||
}
|
||||
|
||||
oldUpdated := latestUpdated(oldPod.ManagedFields)
|
||||
updated := latestUpdated(newPod.ManagedFields)
|
||||
if !updated.IsZero() && updated != oldUpdated {
|
||||
delta := time.Since(updated)
|
||||
podInformerLag.Observe(delta.Seconds())
|
||||
}
|
||||
|
||||
ww.log.Tracef("Updated pod %s.%s", newPod.Name, newPod.Namespace)
|
||||
go ww.submitPodUpdate(newPod, false)
|
||||
}
|
||||
|
||||
// addExternalWorkload is an event handler so it cannot block
|
||||
func (ww *WorkloadWatcher) addExternalWorkload(obj any) {
|
||||
externalWorkload := obj.(*ext.ExternalWorkload)
|
||||
ww.log.Tracef("Added externalworkload %s.%s", externalWorkload.Name, externalWorkload.Namespace)
|
||||
go ww.submitExternalWorkloadUpdate(externalWorkload, false)
|
||||
}
|
||||
|
||||
// deleteExternalWorkload is an event handler so it cannot block
|
||||
func (ww *WorkloadWatcher) deleteExternalWorkload(obj any) {
|
||||
externalWorkload, ok := obj.(*ext.ExternalWorkload)
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
ww.log.Errorf("Couldn't get object from DeletedFinalStateUnknown %#v", obj)
|
||||
return
|
||||
}
|
||||
externalWorkload, ok = tombstone.Obj.(*ext.ExternalWorkload)
|
||||
if !ok {
|
||||
ww.log.Errorf("DeletedFinalStateUnknown contained object that is not an ExternalWorkload %#v", obj)
|
||||
return
|
||||
}
|
||||
}
|
||||
ww.log.Tracef("Deleted externalworklod %s.%s", externalWorkload.Name, externalWorkload.Namespace)
|
||||
go ww.submitExternalWorkloadUpdate(externalWorkload, true)
|
||||
}
|
||||
|
||||
// updateExternalWorkload is an event handler so it cannot block
|
||||
func (ww *WorkloadWatcher) updateExternalWorkload(oldObj any, newObj any) {
|
||||
oldExternalWorkload := oldObj.(*ext.ExternalWorkload)
|
||||
newExternalWorkload := newObj.(*ext.ExternalWorkload)
|
||||
if oldExternalWorkload.DeletionTimestamp == nil && newExternalWorkload.DeletionTimestamp != nil {
|
||||
// this is just a mark, wait for actual deletion event
|
||||
return
|
||||
}
|
||||
|
||||
oldUpdated := latestUpdated(oldExternalWorkload.ManagedFields)
|
||||
updated := latestUpdated(newExternalWorkload.ManagedFields)
|
||||
if !updated.IsZero() && updated != oldUpdated {
|
||||
delta := time.Since(updated)
|
||||
externalWorkloadInformerLag.Observe(delta.Seconds())
|
||||
}
|
||||
|
||||
ww.log.Tracef("Updated pod %s.%s", newExternalWorkload.Name, newExternalWorkload.Namespace)
|
||||
go ww.submitExternalWorkloadUpdate(newExternalWorkload, false)
|
||||
}
|
||||
|
||||
func (ww *WorkloadWatcher) submitPodUpdate(pod *corev1.Pod, remove bool) {
|
||||
ww.mu.RLock()
|
||||
defer ww.mu.RUnlock()
|
||||
|
||||
submitPod := pod
|
||||
if remove {
|
||||
submitPod = nil
|
||||
}
|
||||
|
||||
for _, container := range pod.Spec.Containers {
|
||||
for _, containerPort := range container.Ports {
|
||||
if containerPort.ContainerPort != 0 {
|
||||
for _, pip := range pod.Status.PodIPs {
|
||||
if wp, ok := ww.getWorkloadPublisher(pip.IP, Port(containerPort.ContainerPort)); ok {
|
||||
wp.updatePod(submitPod)
|
||||
}
|
||||
}
|
||||
if len(pod.Status.PodIPs) == 0 && pod.Status.PodIP != "" {
|
||||
if wp, ok := ww.getWorkloadPublisher(pod.Status.PodIP, Port(containerPort.ContainerPort)); ok {
|
||||
wp.updatePod(submitPod)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if containerPort.HostPort != 0 {
|
||||
for _, hip := range pod.Status.HostIPs {
|
||||
if pp, ok := ww.getWorkloadPublisher(hip.IP, Port(containerPort.HostPort)); ok {
|
||||
pp.updatePod(submitPod)
|
||||
}
|
||||
}
|
||||
if len(pod.Status.HostIPs) == 0 && pod.Status.HostIP != "" {
|
||||
if pp, ok := ww.getWorkloadPublisher(pod.Status.HostIP, Port(containerPort.HostPort)); ok {
|
||||
pp.updatePod(submitPod)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ww *WorkloadWatcher) submitExternalWorkloadUpdate(externalWorkload *ext.ExternalWorkload, remove bool) {
|
||||
ww.mu.RLock()
|
||||
defer ww.mu.RUnlock()
|
||||
|
||||
submitWorkload := externalWorkload
|
||||
if remove {
|
||||
submitWorkload = nil
|
||||
}
|
||||
|
||||
for _, port := range externalWorkload.Spec.Ports {
|
||||
for _, ip := range externalWorkload.Spec.WorkloadIPs {
|
||||
if wp, ok := ww.getWorkloadPublisher(ip.Ip, Port(port.Port)); ok {
|
||||
wp.updateExternalWorkload(submitWorkload)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ww *WorkloadWatcher) updateServer(oldObj interface{}, newObj interface{}) {
|
||||
oldServer := oldObj.(*v1beta2.Server)
|
||||
newServer := newObj.(*v1beta2.Server)
|
||||
|
||||
oldUpdated := latestUpdated(oldServer.ManagedFields)
|
||||
updated := latestUpdated(newServer.ManagedFields)
|
||||
if !updated.IsZero() && updated != oldUpdated {
|
||||
delta := time.Since(updated)
|
||||
serverInformerLag.Observe(delta.Seconds())
|
||||
}
|
||||
|
||||
ww.updateServers(newObj)
|
||||
}
|
||||
|
||||
// updateServer triggers an Update() call to the listeners of the workloadPublishers
|
||||
// whose pod matches the Server's podSelector or whose externalworkload matches
|
||||
// the Server's externalworkload selection. This function is an event handler
|
||||
// so it cannot block.
|
||||
func (ww *WorkloadWatcher) updateServers(_ any) {
|
||||
ww.mu.RLock()
|
||||
defer ww.mu.RUnlock()
|
||||
|
||||
for _, wp := range ww.publishers {
|
||||
var opaquePorts map[uint32]struct{}
|
||||
if wp.pod != nil {
|
||||
opaquePorts = GetAnnotatedOpaquePorts(wp.pod, ww.defaultOpaquePorts)
|
||||
} else if wp.externalWorkload != nil {
|
||||
opaquePorts = GetAnnotatedOpaquePortsForExternalWorkload(wp.externalWorkload, ww.defaultOpaquePorts)
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
|
||||
_, isOpaque := opaquePorts[wp.port]
|
||||
// if port is annotated to be always opaque we can disregard Server updates
|
||||
if isOpaque {
|
||||
continue
|
||||
}
|
||||
|
||||
go func(wp *workloadPublisher) {
|
||||
wp.mu.RLock()
|
||||
defer wp.mu.RUnlock()
|
||||
|
||||
updated := false
|
||||
for _, listener := range wp.listeners {
|
||||
// the Server in question doesn't carry information about other
|
||||
// Servers that might target this workloadPublisher; createAddress()
|
||||
// queries all the relevant Servers to determine the full state
|
||||
addr, err := wp.createAddress()
|
||||
if err != nil {
|
||||
ww.log.Errorf("Error creating address for workload: %s", err)
|
||||
continue
|
||||
}
|
||||
if err = listener.Update(&addr); err != nil {
|
||||
ww.log.Warnf("Error sending update to listener: %s", err)
|
||||
continue
|
||||
}
|
||||
updated = true
|
||||
}
|
||||
if updated {
|
||||
wp.metrics.incUpdates()
|
||||
}
|
||||
}(wp)
|
||||
}
|
||||
}
|
||||
|
||||
// getOrNewWorkloadPublisher returns the workloadPublisher for the given target if it
|
||||
// exists. Otherwise, it creates a new one and returns it.
|
||||
func (ww *WorkloadWatcher) getOrNewWorkloadPublisher(service *ServiceID, hostname, ip string, port Port) (*workloadPublisher, error) {
|
||||
ww.mu.Lock()
|
||||
defer ww.mu.Unlock()
|
||||
|
||||
var pod *corev1.Pod
|
||||
var externalWorkload *ext.ExternalWorkload
|
||||
var err error
|
||||
if hostname != "" {
|
||||
pod, err = ww.getEndpointByHostname(hostname, service)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ip = pod.Status.PodIP
|
||||
} else {
|
||||
pod, err = ww.getPodByPodIP(ip, port)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if pod == nil {
|
||||
pod, err = ww.getPodByHostIP(ip, port)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if pod == nil {
|
||||
externalWorkload, err = ww.getExternalWorkloadByIP(ip, port)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ipPort := IPPort{ip, port}
|
||||
wp, ok := ww.publishers[ipPort]
|
||||
if !ok {
|
||||
wp = &workloadPublisher{
|
||||
defaultOpaquePorts: ww.defaultOpaquePorts,
|
||||
k8sAPI: ww.k8sAPI,
|
||||
metadataAPI: ww.metadataAPI,
|
||||
ip: ip,
|
||||
port: port,
|
||||
pod: pod,
|
||||
externalWorkload: externalWorkload,
|
||||
metrics: ipPortVecs.newMetrics(prometheus.Labels{
|
||||
"ip": ip,
|
||||
"port": strconv.FormatUint(uint64(port), 10),
|
||||
}),
|
||||
log: ww.log.WithFields(logging.Fields{
|
||||
"component": "workload-publisher",
|
||||
"ip": ip,
|
||||
"port": port,
|
||||
}),
|
||||
}
|
||||
ww.publishers[ipPort] = wp
|
||||
}
|
||||
return wp, nil
|
||||
}
|
||||
|
||||
func (ww *WorkloadWatcher) getWorkloadPublisher(ip string, port Port) (wp *workloadPublisher, ok bool) {
|
||||
ipPort := IPPort{ip, port}
|
||||
wp, ok = ww.publishers[ipPort]
|
||||
return
|
||||
}
|
||||
|
||||
// getPodByPodIP returns a pod that maps to the given IP address in the pod network
|
||||
func (ww *WorkloadWatcher) getPodByPodIP(podIP string, port uint32) (*corev1.Pod, error) {
|
||||
podIPPods, err := getIndexedPods(ww.k8sAPI, PodIPIndex, podIP)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Unknown, err.Error())
|
||||
}
|
||||
if len(podIPPods) == 1 {
|
||||
ww.log.Debugf("found %s on the pod network", podIP)
|
||||
return podIPPods[0], nil
|
||||
}
|
||||
if len(podIPPods) > 1 {
|
||||
conflictingPods := []string{}
|
||||
for _, pod := range podIPPods {
|
||||
conflictingPods = append(conflictingPods, fmt.Sprintf("%s:%s", pod.Namespace, pod.Name))
|
||||
}
|
||||
ww.log.Warnf("found conflicting %s IP on the pod network: %s", podIP, strings.Join(conflictingPods, ","))
|
||||
return nil, status.Errorf(codes.FailedPrecondition, "found %d pods with a conflicting pod network IP %s", len(podIPPods), podIP)
|
||||
}
|
||||
|
||||
ww.log.Debugf("no pod found for %s:%d", podIP, port)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// getPodByHostIP returns a pod that maps to the given IP address in the host
|
||||
// network. It must have a container port that exposes `port` as a host port.
|
||||
func (ww *WorkloadWatcher) getPodByHostIP(hostIP string, port uint32) (*corev1.Pod, error) {
|
||||
addr := net.JoinHostPort(hostIP, fmt.Sprintf("%d", port))
|
||||
hostIPPods, err := getIndexedPods(ww.k8sAPI, HostIPIndex, addr)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Unknown, err.Error())
|
||||
}
|
||||
if len(hostIPPods) == 1 {
|
||||
ww.log.Debugf("found %s:%d on the host network", hostIP, port)
|
||||
return hostIPPods[0], nil
|
||||
}
|
||||
if len(hostIPPods) > 1 {
|
||||
conflictingPods := []string{}
|
||||
for _, pod := range hostIPPods {
|
||||
conflictingPods = append(conflictingPods, fmt.Sprintf("%s:%s", pod.Namespace, pod.Name))
|
||||
}
|
||||
ww.log.Warnf("found conflicting %s:%d endpoint on the host network: %s", hostIP, port, strings.Join(conflictingPods, ","))
|
||||
return nil, status.Errorf(codes.FailedPrecondition, "found %d pods with a conflicting host network endpoint %s:%d", len(hostIPPods), hostIP, port)
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// getExternalWorkloadByIP returns an externalworkload with the given IP
|
||||
// address.
|
||||
func (ww *WorkloadWatcher) getExternalWorkloadByIP(ip string, port uint32) (*ext.ExternalWorkload, error) {
|
||||
addr := net.JoinHostPort(ip, fmt.Sprintf("%d", port))
|
||||
workloads, err := getIndexedExternalWorkloads(ww.k8sAPI, ExternalWorkloadIPIndex, addr)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Unknown, err.Error())
|
||||
}
|
||||
if len(workloads) == 0 {
|
||||
ww.log.Debugf("no externalworkload found for %s:%d", ip, port)
|
||||
return nil, nil
|
||||
}
|
||||
if len(workloads) == 1 {
|
||||
ww.log.Debugf("found externalworkload %s:%d", ip, port)
|
||||
return workloads[0], nil
|
||||
}
|
||||
if len(workloads) > 1 {
|
||||
conflictingWorkloads := []string{}
|
||||
for _, ew := range workloads {
|
||||
conflictingWorkloads = append(conflictingWorkloads, fmt.Sprintf("%s:%s", ew.Namespace, ew.Name))
|
||||
}
|
||||
ww.log.Warnf("found conflicting %s:%d externalworkload: %s", ip, port, strings.Join(conflictingWorkloads, ","))
|
||||
return nil, status.Errorf(codes.FailedPrecondition, "found %d externalworkloads with a conflicting ip %s:%d", len(workloads), ip, port)
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// getEndpointByHostname returns a pod that maps to the given hostname (or an
|
||||
// instanceID). The hostname is generally the prefix of the pod's DNS name;
|
||||
// since it may be arbitrary we need to look at the corresponding service's
|
||||
// Endpoints object to see whether the hostname matches a pod.
|
||||
func (ww *WorkloadWatcher) getEndpointByHostname(hostname string, svcID *ServiceID) (*corev1.Pod, error) {
|
||||
ep, err := ww.k8sAPI.Endpoint().Lister().Endpoints(svcID.Namespace).Get(svcID.Name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, subset := range ep.Subsets {
|
||||
for _, addr := range subset.Addresses {
|
||||
|
||||
if hostname == addr.Hostname {
|
||||
if addr.TargetRef != nil && addr.TargetRef.Kind == "Pod" {
|
||||
podName := addr.TargetRef.Name
|
||||
podNamespace := addr.TargetRef.Namespace
|
||||
pod, err := ww.k8sAPI.Pod().Lister().Pods(podNamespace).Get(podName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return pod, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil, status.Errorf(codes.NotFound, "no pod found in Endpoints %s/%s for hostname %s", svcID.Namespace, svcID.Name, hostname)
|
||||
}
|
||||
|
||||
func (wp *workloadPublisher) subscribe(listener WorkloadUpdateListener) {
|
||||
wp.mu.Lock()
|
||||
defer wp.mu.Unlock()
|
||||
|
||||
wp.listeners = append(wp.listeners, listener)
|
||||
wp.metrics.setSubscribers(len(wp.listeners))
|
||||
}
|
||||
|
||||
func (wp *workloadPublisher) unsubscribe(listener WorkloadUpdateListener) {
|
||||
wp.mu.Lock()
|
||||
defer wp.mu.Unlock()
|
||||
|
||||
for i, e := range wp.listeners {
|
||||
if e == listener {
|
||||
n := len(wp.listeners)
|
||||
wp.listeners[i] = wp.listeners[n-1]
|
||||
wp.listeners[n-1] = nil
|
||||
wp.listeners = wp.listeners[:n-1]
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
wp.metrics.setSubscribers(len(wp.listeners))
|
||||
}
|
||||
|
||||
// updatePod creates an Address instance for the given pod, that is passed to
|
||||
// the listener's Update() method, only if the pod's readiness state has
|
||||
// changed. If the passed pod is nil, it means the pod (still referred to in
|
||||
// wp.pod) has been deleted.
|
||||
func (wp *workloadPublisher) updatePod(pod *corev1.Pod) {
|
||||
wp.mu.Lock()
|
||||
defer wp.mu.Unlock()
|
||||
|
||||
// pod wasn't ready or there was no backing pod - check if passed pod is ready
|
||||
if wp.pod == nil {
|
||||
if pod == nil {
|
||||
wp.log.Trace("Pod deletion event already consumed - ignore")
|
||||
return
|
||||
}
|
||||
|
||||
if !isRunningAndReady(pod) {
|
||||
wp.log.Tracef("Pod %s.%s not ready - ignore", pod.Name, pod.Namespace)
|
||||
return
|
||||
}
|
||||
|
||||
wp.log.Debugf("Pod %s.%s became ready", pod.Name, pod.Namespace)
|
||||
wp.pod = pod
|
||||
updated := false
|
||||
for _, l := range wp.listeners {
|
||||
addr, err := wp.createAddress()
|
||||
if err != nil {
|
||||
wp.log.Errorf("Error creating address for pod: %s", err)
|
||||
continue
|
||||
}
|
||||
if err = l.Update(&addr); err != nil {
|
||||
wp.log.Warnf("Error sending update to listener: %s", err)
|
||||
continue
|
||||
}
|
||||
updated = true
|
||||
}
|
||||
if updated {
|
||||
wp.metrics.incUpdates()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// backing pod becoming unready or getting deleted
|
||||
if pod == nil || !isRunningAndReady(pod) {
|
||||
wp.log.Debugf("Pod %s.%s deleted or it became unready - remove", wp.pod.Name, wp.pod.Namespace)
|
||||
wp.pod = nil
|
||||
updated := false
|
||||
for _, l := range wp.listeners {
|
||||
addr, err := wp.createAddress()
|
||||
if err != nil {
|
||||
wp.log.Errorf("Error creating address for pod: %s", err)
|
||||
continue
|
||||
}
|
||||
if err = l.Update(&addr); err != nil {
|
||||
wp.log.Warnf("Error sending update to listener: %s", err)
|
||||
continue
|
||||
}
|
||||
updated = true
|
||||
}
|
||||
if updated {
|
||||
wp.metrics.incUpdates()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
wp.log.Tracef("Ignored event on pod %s.%s", pod.Name, pod.Namespace)
|
||||
}
|
||||
|
||||
// updateExternalWorkload creates an Address instance for the given externalworkload,
|
||||
// that is passed to the listener's Update() method, only if the workload's
|
||||
// readiness state has changed. If the passed workload is nil, it means the
|
||||
// workload (still referred to in wp.externalWorkload) has been deleted.
|
||||
func (wp *workloadPublisher) updateExternalWorkload(externalWorkload *ext.ExternalWorkload) {
|
||||
wp.mu.Lock()
|
||||
defer wp.mu.Unlock()
|
||||
|
||||
// externalWorkload wasn't ready or there was no backing externalWorkload.
|
||||
// check if passed externalWorkload is ready
|
||||
if wp.externalWorkload == nil {
|
||||
if externalWorkload == nil {
|
||||
wp.log.Trace("ExternalWorkload deletion event already consumed - ignore")
|
||||
return
|
||||
}
|
||||
|
||||
if !externalworkload.IsReady(externalWorkload) {
|
||||
wp.log.Tracef("ExternalWorkload %s.%s not ready - ignore", externalWorkload.Name, externalWorkload.Namespace)
|
||||
return
|
||||
}
|
||||
|
||||
wp.log.Debugf("ExternalWorkload %s.%s became ready", externalWorkload.Name, externalWorkload.Namespace)
|
||||
wp.externalWorkload = externalWorkload
|
||||
updated := false
|
||||
for _, l := range wp.listeners {
|
||||
addr, err := wp.createAddress()
|
||||
if err != nil {
|
||||
wp.log.Errorf("Error creating address for externalWorkload: %s", err)
|
||||
continue
|
||||
}
|
||||
if err = l.Update(&addr); err != nil {
|
||||
wp.log.Warnf("Error sending update to listener: %s", err)
|
||||
continue
|
||||
}
|
||||
updated = true
|
||||
}
|
||||
if updated {
|
||||
wp.metrics.incUpdates()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// backing pod becoming unready or getting deleted
|
||||
if externalWorkload == nil || !externalworkload.IsReady(externalWorkload) {
|
||||
wp.log.Debugf("ExternalWorkload %s.%s deleted or it became unready - remove", wp.externalWorkload.Name, wp.externalWorkload.Namespace)
|
||||
wp.externalWorkload = nil
|
||||
updated := false
|
||||
for _, l := range wp.listeners {
|
||||
addr, err := wp.createAddress()
|
||||
if err != nil {
|
||||
wp.log.Errorf("Error creating address for pod: %s", err)
|
||||
continue
|
||||
}
|
||||
if err = l.Update(&addr); err != nil {
|
||||
wp.log.Warnf("Error sending update to listener: %s", err)
|
||||
continue
|
||||
}
|
||||
updated = true
|
||||
}
|
||||
if updated {
|
||||
wp.metrics.incUpdates()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
wp.log.Tracef("Ignored event on externalWorkload %s.%s", externalWorkload.Name, externalWorkload.Namespace)
|
||||
}
|
||||
|
||||
// createAddress returns an Address instance for the given ip, port and workload. It
|
||||
// completes the ownership and opaque protocol information
|
||||
func (wp *workloadPublisher) createAddress() (Address, error) {
|
||||
var ownerKind, ownerName string
|
||||
var err error
|
||||
if wp.pod != nil {
|
||||
ownerKind, ownerName, err = wp.metadataAPI.GetOwnerKindAndName(context.Background(), wp.pod, true)
|
||||
if err != nil {
|
||||
return Address{}, err
|
||||
}
|
||||
} else if wp.externalWorkload != nil {
|
||||
if len(wp.externalWorkload.GetOwnerReferences()) == 1 {
|
||||
ownerKind = wp.externalWorkload.GetOwnerReferences()[0].Kind
|
||||
ownerName = wp.externalWorkload.GetOwnerReferences()[0].Name
|
||||
}
|
||||
}
|
||||
|
||||
address := Address{
|
||||
IP: wp.ip,
|
||||
Port: wp.port,
|
||||
Pod: wp.pod,
|
||||
ExternalWorkload: wp.externalWorkload,
|
||||
OwnerName: ownerName,
|
||||
OwnerKind: ownerKind,
|
||||
}
|
||||
|
||||
// Override opaqueProtocol if the endpoint's port is annotated as opaque
|
||||
if wp.pod != nil {
|
||||
opaquePorts := GetAnnotatedOpaquePorts(wp.pod, wp.defaultOpaquePorts)
|
||||
if _, ok := opaquePorts[wp.port]; ok {
|
||||
address.OpaqueProtocol = true
|
||||
} else {
|
||||
if err := SetToServerProtocol(wp.k8sAPI, &address, wp.port); err != nil {
|
||||
return Address{}, fmt.Errorf("failed to set address OpaqueProtocol: %w", err)
|
||||
}
|
||||
}
|
||||
} else if wp.externalWorkload != nil {
|
||||
opaquePorts := GetAnnotatedOpaquePortsForExternalWorkload(wp.externalWorkload, wp.defaultOpaquePorts)
|
||||
if _, ok := opaquePorts[wp.port]; ok {
|
||||
address.OpaqueProtocol = true
|
||||
} else {
|
||||
if err := SetToServerProtocolExternalWorkload(wp.k8sAPI, &address, wp.port); err != nil {
|
||||
return Address{}, fmt.Errorf("failed to set address OpaqueProtocol: %w", err)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if _, ok := wp.defaultOpaquePorts[wp.port]; ok {
|
||||
address.OpaqueProtocol = true
|
||||
}
|
||||
}
|
||||
|
||||
return address, nil
|
||||
}
|
||||
|
||||
// GetAnnotatedOpaquePorts returns the opaque ports for the pod given its
|
||||
// annotations, or the default opaque ports if it's not annotated
|
||||
func GetAnnotatedOpaquePorts(pod *corev1.Pod, defaultPorts map[uint32]struct{}) map[uint32]struct{} {
|
||||
if pod == nil {
|
||||
return defaultPorts
|
||||
}
|
||||
annotation, ok := pod.Annotations[consts.ProxyOpaquePortsAnnotation]
|
||||
if !ok {
|
||||
return defaultPorts
|
||||
}
|
||||
opaquePorts := make(map[uint32]struct{})
|
||||
if annotation != "" {
|
||||
for _, pr := range util.ParseContainerOpaquePorts(annotation, pod.Spec.Containers) {
|
||||
for _, port := range pr.Ports() {
|
||||
opaquePorts[uint32(port)] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
return opaquePorts
|
||||
}
|
||||
|
||||
// GetAnnotatedOpaquePortsForExternalWorkload returns the opaque ports for the external workload given its
|
||||
// annotations, or the default opaque ports if it's not annotated
|
||||
func GetAnnotatedOpaquePortsForExternalWorkload(ew *ext.ExternalWorkload, defaultPorts map[uint32]struct{}) map[uint32]struct{} {
|
||||
if ew == nil {
|
||||
return defaultPorts
|
||||
}
|
||||
annotation, ok := ew.Annotations[consts.ProxyOpaquePortsAnnotation]
|
||||
if !ok {
|
||||
return defaultPorts
|
||||
}
|
||||
opaquePorts := make(map[uint32]struct{})
|
||||
if annotation != "" {
|
||||
for _, pr := range parseExternalWorkloadOpaquePorts(annotation, ew) {
|
||||
for _, port := range pr.Ports() {
|
||||
opaquePorts[uint32(port)] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
return opaquePorts
|
||||
}
|
||||
|
||||
func parseExternalWorkloadOpaquePorts(override string, ew *ext.ExternalWorkload) []util.PortRange {
|
||||
portRanges := util.GetPortRanges(override)
|
||||
var values []util.PortRange
|
||||
for _, pr := range portRanges {
|
||||
port, named := isNamedInExternalWorkload(pr, ew)
|
||||
if named {
|
||||
values = append(values, util.PortRange{UpperBound: int(port), LowerBound: int(port)})
|
||||
} else {
|
||||
pr, err := util.ParsePortRange(pr)
|
||||
if err != nil {
|
||||
logging.Warnf("Invalid port range [%v]: %s", pr, err)
|
||||
continue
|
||||
}
|
||||
values = append(values, pr)
|
||||
}
|
||||
}
|
||||
return values
|
||||
}
|
||||
|
||||
func isNamedInExternalWorkload(pr string, ew *ext.ExternalWorkload) (int32, bool) {
|
||||
for _, p := range ew.Spec.Ports {
|
||||
if p.Name == pr {
|
||||
return p.Port, true
|
||||
}
|
||||
}
|
||||
|
||||
return 0, false
|
||||
}
|
||||
|
||||
func isRunningAndReady(pod *corev1.Pod) bool {
|
||||
if pod == nil || pod.Status.Phase != corev1.PodRunning {
|
||||
return false
|
||||
}
|
||||
for _, condition := range pod.Status.Conditions {
|
||||
if condition.Type == corev1.PodReady && condition.Status == corev1.ConditionTrue {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
|
@ -60,7 +60,7 @@ status:
|
|||
}
|
||||
|
||||
k8sAPI.Sync(nil)
|
||||
pw := PodWatcher{
|
||||
ww := WorkloadWatcher{
|
||||
k8sAPI: k8sAPI,
|
||||
log: log.WithFields(log.Fields{
|
||||
"component": "pod-watcher",
|
||||
|
|
@ -68,7 +68,7 @@ status:
|
|||
}
|
||||
|
||||
// Get host IP pod that is mapped to the port `hostPort1`
|
||||
pod, err := pw.getPodByHostIP(hostIP, hostPort1)
|
||||
pod, err := ww.getPodByHostIP(hostIP, hostPort1)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get pod: %s", err)
|
||||
}
|
||||
|
|
@ -81,7 +81,7 @@ status:
|
|||
// Get host IP pod that is mapped to the port `hostPort2`; this tests
|
||||
// that the indexer properly adds multiple containers from a single
|
||||
// pod.
|
||||
pod, err = pw.getPodByHostIP(hostIP, hostPort2)
|
||||
pod, err = ww.getPodByHostIP(hostIP, hostPort2)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get pod: %s", err)
|
||||
}
|
||||
|
|
@ -92,7 +92,7 @@ status:
|
|||
t.Fatalf("expected pod name to be %s, but got %s", expectedPodName, pod.Name)
|
||||
}
|
||||
// Get host IP pod with unmapped host port
|
||||
pod, err = pw.getPodByHostIP(hostIP, 12347)
|
||||
pod, err = ww.getPodByHostIP(hostIP, 12347)
|
||||
if err != nil {
|
||||
t.Fatalf("expected no error when getting host IP pod with unmapped host port, but got: %s", err)
|
||||
}
|
||||
|
|
@ -100,7 +100,7 @@ status:
|
|||
t.Fatal("expected no pod to be found with unmapped host port")
|
||||
}
|
||||
// Get pod IP pod and expect an error
|
||||
_, err = pw.getPodByPodIP(podIP, 12346)
|
||||
_, err = ww.getPodByPodIP(podIP, 12346)
|
||||
if err == nil {
|
||||
t.Fatal("expected error when getting by pod IP and unmapped host port, but got none")
|
||||
}
|
||||
Loading…
Reference in New Issue