discovery: handle endpoint slices from ExternalWorkload (#11939)

This alters the endpoints slices watcher to handle slices that reference ExternalWorkloads.

Testing
Add the following resources: 

```yaml
apiVersion: discovery.k8s.io/v1
kind: EndpointSlice
addressType: IPv4
metadata:
  name: my-external-workload
  namespace: mixed-env
  labels:
    kubernetes.io/service-name: test-1
endpoints:
- addresses:
  - 172.21.0.5
  conditions:
    ready: true
    serving: true
    terminating: false
  targetRef:
    kind: ExternalWorkload
    name: my-external-workload
ports:
- port: 8080
  name: http
---
apiVersion: workload.linkerd.io/v1alpha1
kind: ExternalWorkload
metadata:
  name: my-external-workload
  namespace: mixed-env
  labels:
    app: test
spec:
  meshTls:
    identity: "test"
    serverName: "test"
  workloadIPs:
  - ip: 172.21.0.5
  ports:
  - port: 8080
    name: http
---
apiVersion: v1
kind: Service
metadata:
  name: test-1
  namespace: mixed-env
spec:
  selector:
    app: test
  type: ClusterIP
  ports:
  - name: http
    port: 8080
    targetPort: 8080
    protocol: TCP

```

Observe endpoints:
```
linkerd dg endpoints test-1.mixed-env.svc.cluster.local:8080
```

Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
This commit is contained in:
Zahari Dichev 2024-01-18 01:43:20 +02:00 committed by GitHub
parent 983fc55abc
commit 027d49a9a6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 1166 additions and 65 deletions

View File

@ -875,7 +875,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- args:
- identity
@ -1260,7 +1260,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- env:
- name: _pod_name
@ -1694,7 +1694,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- env:
- name: _pod_name

View File

@ -875,7 +875,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- args:
- identity
@ -1259,7 +1259,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- env:
- name: _pod_name
@ -1692,7 +1692,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- env:
- name: _pod_name

View File

@ -875,7 +875,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- args:
- identity
@ -1259,7 +1259,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- env:
- name: _pod_name
@ -1692,7 +1692,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- env:
- name: _pod_name

View File

@ -875,7 +875,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- args:
- identity
@ -1259,7 +1259,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- env:
- name: _pod_name
@ -1692,7 +1692,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- env:
- name: _pod_name

View File

@ -875,7 +875,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- args:
- identity
@ -1259,7 +1259,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- env:
- name: _pod_name
@ -1692,7 +1692,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- env:
- name: _pod_name

View File

@ -875,7 +875,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- args:
- identity
@ -1250,7 +1250,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- env:
- name: _pod_name
@ -1674,7 +1674,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- env:
- name: _pod_name

View File

@ -806,7 +806,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- args:
- identity
@ -1190,7 +1190,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- env:
- name: _pod_name
@ -1563,7 +1563,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- env:
- name: _pod_name

View File

@ -848,7 +848,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- args:
- identity
@ -1234,7 +1234,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- env:
- name: _pod_name
@ -1671,7 +1671,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- env:
- name: _pod_name

View File

@ -875,7 +875,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- args:
- identity
@ -1253,7 +1253,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- env:
- name: _pod_name
@ -1680,7 +1680,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- env:
- name: _pod_name

View File

@ -844,7 +844,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- args:
- identity
@ -1225,7 +1225,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- env:
- name: _pod_name
@ -1662,7 +1662,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- env:
- name: _pod_name

View File

@ -875,7 +875,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- args:
- identity
@ -1259,7 +1259,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- env:
- name: _pod_name
@ -1692,7 +1692,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- env:
- name: _pod_name

View File

@ -875,7 +875,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- args:
- identity
@ -1259,7 +1259,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- env:
- name: _pod_name
@ -1692,7 +1692,7 @@ spec:
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- env:
- name: _pod_name

View File

@ -9,9 +9,11 @@ import (
pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
"github.com/linkerd/linkerd2-proxy-api/go/net"
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
ewv1alpha1 "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1alpha1"
"github.com/linkerd/linkerd2/controller/k8s"
"github.com/linkerd/linkerd2/pkg/addr"
pkgK8s "github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
logging "github.com/sirupsen/logrus"
@ -379,7 +381,14 @@ func (et *endpointTranslator) sendClientAdd(set watcher.AddressSet) {
opaquePorts = watcher.GetAnnotatedOpaquePorts(address.Pod, et.defaultOpaquePorts)
wa, err = createWeightedAddr(address, opaquePorts, et.enableH2Upgrade, et.identityTrustDomain, et.controllerNS)
if err != nil {
et.log.Errorf("Failed to translate endpoints to weighted addr: %s", err)
et.log.Errorf("Failed to translate Pod endpoints to weighted addr: %s", err)
continue
}
} else if address.ExternalWorkload != nil {
opaquePorts = getAnnotatedOpaquePortsForExternalWorkload(address.ExternalWorkload, et.defaultOpaquePorts)
wa, err = createWeightedAddrForExternalWorkload(address, opaquePorts)
if err != nil {
et.log.Errorf("Failed to translate ExternalWorkload endpoints to weighted addr: %s", err)
continue
}
} else {
@ -480,6 +489,74 @@ func toAddr(address watcher.Address) (*net.TcpAddress, error) {
}, nil
}
func createWeightedAddrForExternalWorkload(
address watcher.Address,
opaquePorts map[uint32]struct{},
) (*pb.WeightedAddr, error) {
tcpAddr, err := toAddr(address)
if err != nil {
return nil, err
}
weightedAddr := pb.WeightedAddr{
Addr: tcpAddr,
Weight: defaultWeight,
MetricLabels: map[string]string{},
}
weightedAddr.MetricLabels = pkgK8s.GetExternalWorkloadLabels(address.OwnerKind, address.OwnerName, address.ExternalWorkload)
// If the address is not backed by an ExternalWorkload, there is no additional metadata
// to add.
if address.ExternalWorkload == nil {
return &weightedAddr, nil
}
weightedAddr.ProtocolHint = &pb.ProtocolHint{}
_, opaquePort := opaquePorts[address.Port]
// If address is set as opaque by a Server, or its port is set as
// opaque by annotation or default value, then set the hinted protocol to
// Opaque.
if address.OpaqueProtocol || opaquePort {
weightedAddr.ProtocolHint.Protocol = &pb.ProtocolHint_Opaque_{
Opaque: &pb.ProtocolHint_Opaque{},
}
port, err := getInboundPortFromExternalWorkload(&address.ExternalWorkload.Spec)
if err != nil {
return nil, fmt.Errorf("failed to read inbound port: %w", err)
}
weightedAddr.ProtocolHint.OpaqueTransport = &pb.ProtocolHint_OpaqueTransport{
InboundPort: port,
}
} else {
weightedAddr.ProtocolHint.Protocol = &pb.ProtocolHint_H2_{
H2: &pb.ProtocolHint_H2{},
}
}
// we assume external workloads support only SPIRE identity
weightedAddr.TlsIdentity = &pb.TlsIdentity{
Strategy: &pb.TlsIdentity_UriLikeIdentity_{
UriLikeIdentity: &pb.TlsIdentity_UriLikeIdentity{
Uri: address.ExternalWorkload.Spec.MeshTls.Identity,
},
},
ServerName: &pb.TlsIdentity_DnsLikeIdentity{
Name: address.ExternalWorkload.Spec.MeshTls.ServerName,
},
}
weightedAddr.MetricLabels = pkgK8s.GetExternalWorkloadLabels(address.OwnerKind, address.OwnerName, address.ExternalWorkload)
// Set a zone label, even if it is empty (for consistency).
z := ""
if address.Zone != nil {
z = *address.Zone
}
weightedAddr.MetricLabels["zone"] = z
return &weightedAddr, nil
}
func createWeightedAddr(
address watcher.Address,
opaquePorts map[uint32]struct{},
@ -558,12 +635,13 @@ func createWeightedAddr(
!isSkippedInboundPort {
id := fmt.Sprintf("%s.%s.serviceaccount.identity.%s.%s", sa, ns, controllerNSLabel, identityTrustDomain)
tlsId := &pb.TlsIdentity_DnsLikeIdentity{Name: id}
weightedAddr.TlsIdentity = &pb.TlsIdentity{
Strategy: &pb.TlsIdentity_DnsLikeIdentity_{
DnsLikeIdentity: &pb.TlsIdentity_DnsLikeIdentity{
Name: id,
},
DnsLikeIdentity: tlsId,
},
ServerName: tlsId,
}
}
@ -609,3 +687,65 @@ func getInboundPort(podSpec *corev1.PodSpec) (uint32, error) {
}
return 0, fmt.Errorf("failed to find %s environment variable in any container for given pod spec", envInboundListenAddr)
}
// getInboundPortFromExternalWorkload gets the inbound port from the ExternalWorkload spec
// variable.
func getInboundPortFromExternalWorkload(ewSpec *ewv1alpha1.ExternalWorkloadSpec) (uint32, error) {
for _, p := range ewSpec.Ports {
if p.Name == pkgK8s.ProxyPortName {
return uint32(p.Port), nil
}
}
return 0, fmt.Errorf("failed to find %s port for given ExternalWorkloadSpec", pkgK8s.ProxyPortName)
}
// getAnnotatedOpaquePortsForExternalWorkload returns the opaque ports for the external workload given its
// annotations, or the default opaque ports if it's not annotated
func getAnnotatedOpaquePortsForExternalWorkload(ew *ewv1alpha1.ExternalWorkload, defaultPorts map[uint32]struct{}) map[uint32]struct{} {
if ew == nil {
return defaultPorts
}
annotation, ok := ew.Annotations[pkgK8s.ProxyOpaquePortsAnnotation]
if !ok {
return defaultPorts
}
opaquePorts := make(map[uint32]struct{})
if annotation != "" {
for _, pr := range parseExternalWorkloadOpaquePorts(annotation, ew) {
for _, port := range pr.Ports() {
opaquePorts[uint32(port)] = struct{}{}
}
}
}
return opaquePorts
}
func parseExternalWorkloadOpaquePorts(override string, ew *ewv1alpha1.ExternalWorkload) []util.PortRange {
portRanges := util.GetPortRanges(override)
var values []util.PortRange
for _, pr := range portRanges {
port, named := isNamedInExternalWorkload(pr, ew)
if named {
values = append(values, util.PortRange{UpperBound: int(port), LowerBound: int(port)})
} else {
pr, err := util.ParsePortRange(pr)
if err != nil {
logging.Warnf("Invalid port range [%v]: %s", pr, err)
continue
}
values = append(values, pr)
}
}
return values
}
func isNamedInExternalWorkload(pr string, ew *ewv1alpha1.ExternalWorkload) (int32, bool) {
for _, p := range ew.Spec.Ports {
if p.Name == pr {
return p.Port, true
}
}
return 0, false
}

View File

@ -11,6 +11,7 @@ import (
pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
"github.com/linkerd/linkerd2-proxy-api/go/net"
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
ewv1alpha1 "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1alpha1"
"github.com/linkerd/linkerd2/pkg/addr"
"github.com/linkerd/linkerd2/pkg/k8s"
corev1 "k8s.io/api/core/v1"
@ -101,6 +102,95 @@ var (
OpaqueProtocol: true,
}
ew1 = watcher.Address{
IP: "1.1.1.1",
Port: 1,
ExternalWorkload: &ewv1alpha1.ExternalWorkload{
ObjectMeta: metav1.ObjectMeta{
Name: "ew-1",
Namespace: "ns",
},
Spec: ewv1alpha1.ExternalWorkloadSpec{
MeshTls: ewv1alpha1.MeshTls{
Identity: "spiffe://some-domain/ew-1",
ServerName: "server.local",
},
},
},
OwnerKind: "workloadgroup",
OwnerName: "wg-name",
}
ew2 = watcher.Address{
IP: "1.1.1.2",
Port: 2,
ExternalWorkload: &ewv1alpha1.ExternalWorkload{
ObjectMeta: metav1.ObjectMeta{
Name: "ew-2",
Namespace: "ns",
Labels: map[string]string{
k8s.ControllerNSLabel: "linkerd",
k8s.ProxyDeploymentLabel: "deployment-name",
},
},
Spec: ewv1alpha1.ExternalWorkloadSpec{
MeshTls: ewv1alpha1.MeshTls{
Identity: "spiffe://some-domain/ew-2",
ServerName: "server.local",
},
},
},
}
ew3 = watcher.Address{
IP: "1.1.1.3",
Port: 3,
ExternalWorkload: &ewv1alpha1.ExternalWorkload{
ObjectMeta: metav1.ObjectMeta{
Name: "ew-3",
Namespace: "ns",
Labels: map[string]string{
k8s.ControllerNSLabel: "linkerd",
k8s.ProxyDeploymentLabel: "deployment-name",
},
},
Spec: ewv1alpha1.ExternalWorkloadSpec{
MeshTls: ewv1alpha1.MeshTls{
Identity: "spiffe://some-domain/ew-3",
ServerName: "server.local",
},
},
},
}
ewOpaque = watcher.Address{
IP: "1.1.1.4",
Port: 4,
ExternalWorkload: &ewv1alpha1.ExternalWorkload{
ObjectMeta: metav1.ObjectMeta{
Name: "pod4",
Namespace: "ns",
Annotations: map[string]string{
k8s.ProxyOpaquePortsAnnotation: "4",
},
},
Spec: ewv1alpha1.ExternalWorkloadSpec{
MeshTls: ewv1alpha1.MeshTls{
Identity: "spiffe://some-domain/ew-opaque",
ServerName: "server.local",
},
Ports: []ewv1alpha1.PortSpec{
{
Port: 4143,
Name: "linkerd-proxy",
},
},
},
},
OpaqueProtocol: true,
}
remoteGateway1 = watcher.Address{
IP: "1.1.1.1",
Port: 1,
@ -429,6 +519,136 @@ func TestEndpointTranslatorForPods(t *testing.T) {
})
}
func TestEndpointTranslatorExternalWorkloads(t *testing.T) {
t.Run("Sends one update for add and another for remove", func(t *testing.T) {
mockGetServer, translator := makeEndpointTranslator(t)
translator.Start()
defer translator.Stop()
translator.Add(mkAddressSetForExternalWorkloads(ew1, ew2))
translator.Remove(mkAddressSetForExternalWorkloads(ew2))
expectedNumUpdates := 2
<-mockGetServer.updatesReceived // Add
<-mockGetServer.updatesReceived // Remove
if len(mockGetServer.updatesReceived) != 0 {
t.Fatalf("Expecting [%d] updates, got [%d].", expectedNumUpdates, expectedNumUpdates+len(mockGetServer.updatesReceived))
}
})
t.Run("Sends addresses as removed or added", func(t *testing.T) {
mockGetServer, translator := makeEndpointTranslator(t)
translator.Start()
defer translator.Stop()
translator.Add(mkAddressSetForExternalWorkloads(ew1, ew2, ew3))
translator.Remove(mkAddressSetForExternalWorkloads(ew3))
addressesAdded := (<-mockGetServer.updatesReceived).GetAdd().Addrs
actualNumberOfAdded := len(addressesAdded)
expectedNumberOfAdded := 3
if actualNumberOfAdded != expectedNumberOfAdded {
t.Fatalf("Expecting [%d] addresses to be added, got [%d]: %v", expectedNumberOfAdded, actualNumberOfAdded, addressesAdded)
}
addressesRemoved := (<-mockGetServer.updatesReceived).GetRemove().Addrs
actualNumberOfRemoved := len(addressesRemoved)
expectedNumberOfRemoved := 1
if actualNumberOfRemoved != expectedNumberOfRemoved {
t.Fatalf("Expecting [%d] addresses to be removed, got [%d]: %v", expectedNumberOfRemoved, actualNumberOfRemoved, addressesRemoved)
}
sort.Slice(addressesAdded, func(i, j int) bool {
return addressesAdded[i].GetAddr().Port < addressesAdded[j].GetAddr().Port
})
checkAddressAndWeight(t, addressesAdded[0], ew1, defaultWeight)
checkAddressAndWeight(t, addressesAdded[1], ew2, defaultWeight)
checkAddress(t, addressesRemoved[0], ew3)
})
t.Run("Sends metric labels with added addresses", func(t *testing.T) {
mockGetServer, translator := makeEndpointTranslator(t)
translator.Start()
defer translator.Stop()
translator.Add(mkAddressSetForExternalWorkloads(ew1))
update := <-mockGetServer.updatesReceived
actualGlobalMetricLabels := update.GetAdd().MetricLabels
expectedGlobalMetricLabels := map[string]string{"namespace": "service-ns", "service": "service-name"}
if diff := deep.Equal(actualGlobalMetricLabels, expectedGlobalMetricLabels); diff != nil {
t.Fatalf("Expected global metric labels sent to be [%v] but was [%v]", expectedGlobalMetricLabels, actualGlobalMetricLabels)
}
actualAddedAddress1MetricLabels := update.GetAdd().Addrs[0].MetricLabels
expectedAddedAddress1MetricLabels := map[string]string{
"external_workload": "ew-1",
"zone": "",
"workloadgroup": "wg-name",
}
if diff := deep.Equal(actualAddedAddress1MetricLabels, expectedAddedAddress1MetricLabels); diff != nil {
t.Fatalf("Expected global metric labels sent to be [%v] but was [%v]", expectedAddedAddress1MetricLabels, actualAddedAddress1MetricLabels)
}
})
t.Run("Sends TlsIdentity and Server Name when enabled", func(t *testing.T) {
expectedTLSIdentity := &pb.TlsIdentity{
Strategy: &pb.TlsIdentity_UriLikeIdentity_{
UriLikeIdentity: &pb.TlsIdentity_UriLikeIdentity{
Uri: "spiffe://some-domain/ew-1",
},
},
ServerName: &pb.TlsIdentity_DnsLikeIdentity{
Name: "server.local",
},
}
mockGetServer, translator := makeEndpointTranslator(t)
translator.Start()
defer translator.Stop()
translator.Add(mkAddressSetForExternalWorkloads(ew1))
addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
if len(addrs) != 1 {
t.Fatalf("Expected [1] address returned, got %v", addrs)
}
actualTLSIdentity := addrs[0].GetTlsIdentity()
if diff := deep.Equal(actualTLSIdentity, expectedTLSIdentity); diff != nil {
t.Fatalf("Expected TlsIdentity to be [%v] but was [%v]", expectedTLSIdentity, actualTLSIdentity)
}
})
t.Run("Sends Opaque ProtocolHint for opaque ports", func(t *testing.T) {
expectedProtocolHint := &pb.ProtocolHint{
Protocol: &pb.ProtocolHint_Opaque_{
Opaque: &pb.ProtocolHint_Opaque{},
},
OpaqueTransport: &pb.ProtocolHint_OpaqueTransport{
InboundPort: 4143,
},
}
mockGetServer, translator := makeEndpointTranslator(t)
translator.Start()
defer translator.Stop()
translator.Add(mkAddressSetForExternalWorkloads(ewOpaque))
addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
if len(addrs) != 1 {
t.Fatalf("Expected [1] address returned, got %v", addrs)
}
actualProtocolHint := addrs[0].GetProtocolHint()
if diff := deep.Equal(actualProtocolHint, expectedProtocolHint); diff != nil {
t.Fatalf("ProtocolHint: %v", diff)
}
})
}
func TestEndpointTranslatorTopologyAwareFilter(t *testing.T) {
t.Run("Sends one update for add and none for remove", func(t *testing.T) {
mockGetServer, translator := makeEndpointTranslator(t)
@ -575,6 +795,18 @@ func mkAddressSetForPods(podAddresses ...watcher.Address) watcher.AddressSet {
return set
}
func mkAddressSetForExternalWorkloads(ewAddresses ...watcher.Address) watcher.AddressSet {
set := watcher.AddressSet{
Addresses: make(map[watcher.PodID]watcher.Address),
Labels: map[string]string{"service": "service-name", "namespace": "service-ns"},
}
for _, ew := range ewAddresses {
id := watcher.ExternalWorkloadID{Name: ew.ExternalWorkload.Name, Namespace: ew.ExternalWorkload.Namespace}
set.Addresses[id] = ew
}
return set
}
func checkAddressAndWeight(t *testing.T, actual *pb.WeightedAddr, expected watcher.Address, weight uint32) {
t.Helper()

View File

@ -10,6 +10,7 @@ import (
"sync"
"time"
ewv1alpha1 "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1alpha1"
"github.com/linkerd/linkerd2/controller/gen/apis/server/v1beta2"
"github.com/linkerd/linkerd2/controller/k8s"
consts "github.com/linkerd/linkerd2/pkg/k8s"
@ -36,6 +37,7 @@ const (
)
const endpointTargetRefPod = "Pod"
const endpointTargetRefExternalWorkload = "ExternalWorkload"
type (
// Address represents an individual port on a specific endpoint.
@ -47,6 +49,7 @@ type (
IP string
Port Port
Pod *corev1.Pod
ExternalWorkload *ewv1alpha1.ExternalWorkload
OwnerName string
OwnerKind string
Identity string
@ -704,13 +707,8 @@ func (sp *servicePublisher) updateServer(server *v1beta2.Server, isAdd bool) {
sp.Lock()
defer sp.Unlock()
selector, err := metav1.LabelSelectorAsSelector(server.Spec.PodSelector)
if err != nil {
sp.log.Errorf("failed to create Selector: %s", err)
return
}
for _, pp := range sp.ports {
pp.updateServer(server, selector, isAdd)
pp.updateServer(server, isAdd)
}
}
@ -899,6 +897,7 @@ func (pp *portPublisher) endpointSliceToAddresses(es *discovery.EndpointSlice) A
pp.log.Errorf("failed to set address OpaqueProtocol: %s", err)
continue
}
address.Zone = endpoint.Zone
if endpoint.Hints != nil {
zones := make([]discovery.ForZone, len(endpoint.Hints.ForZones))
@ -909,6 +908,32 @@ func (pp *portPublisher) endpointSliceToAddresses(es *discovery.EndpointSlice) A
}
}
if endpoint.TargetRef.Kind == endpointTargetRefExternalWorkload {
for _, IPAddr := range endpoint.Addresses {
address, id, err := pp.newExtRefAddress(resolvedPort, IPAddr, endpoint.TargetRef.Name, es.Namespace)
if err != nil {
pp.log.Errorf("Unable to create new address: %v", err)
continue
}
err = setToServerProtocolExternalWorkload(pp.k8sAPI, &address, resolvedPort)
if err != nil {
pp.log.Errorf("failed to set address OpaqueProtocol: %s", err)
continue
}
address.Zone = endpoint.Zone
if endpoint.Hints != nil {
zones := make([]discovery.ForZone, len(endpoint.Hints.ForZones))
copy(zones, endpoint.Hints.ForZones)
address.ForZones = zones
}
addresses[id] = address
}
}
}
return AddressSet{
Addresses: addresses,
@ -1050,6 +1075,33 @@ func (pp *portPublisher) newPodRefAddress(endpointPort Port, endpointIP, podName
return addr, id, nil
}
func (pp *portPublisher) newExtRefAddress(endpointPort Port, endpointIP, externalWorkloadName, externalWorkloadNamespace string) (Address, ExternalWorkloadID, error) {
id := ExternalWorkloadID{
Name: externalWorkloadName,
Namespace: externalWorkloadNamespace,
}
ew, err := pp.k8sAPI.ExtWorkload().Lister().ExternalWorkloads(id.Namespace).Get(id.Name)
if err != nil {
return Address{}, ExternalWorkloadID{}, fmt.Errorf("unable to fetch ExternalWorkload %v: %w", id, err)
}
addr := Address{
IP: endpointIP,
Port: endpointPort,
ExternalWorkload: ew,
}
ownerRefs := ew.GetOwnerReferences()
if len(ownerRefs) == 1 {
parent := ownerRefs[0]
addr.OwnerName = parent.Name
addr.OwnerName = strings.ToLower(parent.Kind)
}
return addr, id, nil
}
func (pp *portPublisher) resolveESTargetPort(slicePorts []discovery.EndpointPort) Port {
if slicePorts == nil {
return undefinedEndpointPort
@ -1180,12 +1232,21 @@ func (pp *portPublisher) unsubscribe(listener EndpointUpdateListener) {
pp.metrics.setSubscribers(len(pp.listeners))
}
func (pp *portPublisher) updateServer(server *v1beta2.Server, selector labels.Selector, isAdd bool) {
func (pp *portPublisher) updateServer(server *v1beta2.Server, isAdd bool) {
updated := false
for id, address := range pp.addresses.Addresses {
if address.Pod != nil && selector.Matches(labels.Set(address.Pod.Labels)) {
var portMatch bool
portMatch := false
if address.Pod != nil {
selector, err := metav1.LabelSelectorAsSelector(server.Spec.PodSelector)
if err != nil {
pp.log.Errorf("failed to create Selector: %s", err)
return
}
if !selector.Matches(labels.Set(address.Pod.Labels)) {
continue
}
switch server.Spec.Port.Type {
case intstr.Int:
if server.Spec.Port.IntVal == int32(address.Port) {
@ -1202,16 +1263,46 @@ func (pp *portPublisher) updateServer(server *v1beta2.Server, selector labels.Se
default:
continue
}
if portMatch {
if isAdd && server.Spec.ProxyProtocol == opaqueProtocol {
address.OpaqueProtocol = true
} else {
address.OpaqueProtocol = false
} else if address.ExternalWorkload != nil {
selector, err := metav1.LabelSelectorAsSelector(server.Spec.ExternalWorkloadSelector)
if err != nil {
pp.log.Errorf("failed to create Selector: %s", err)
return
}
if !selector.Matches(labels.Set(address.ExternalWorkload.Labels)) {
continue
}
switch server.Spec.Port.Type {
case intstr.Int:
if server.Spec.Port.IntVal == int32(address.Port) {
portMatch = true
}
if pp.addresses.Addresses[id].OpaqueProtocol != address.OpaqueProtocol {
pp.addresses.Addresses[id] = address
updated = true
case intstr.String:
for _, p := range address.ExternalWorkload.Spec.Ports {
if p.Port == int32(address.Port) && p.Name == server.Spec.Port.StrVal {
portMatch = true
}
}
default:
continue
}
} else {
continue
}
if portMatch {
if isAdd && server.Spec.ProxyProtocol == opaqueProtocol {
address.OpaqueProtocol = true
} else {
address.OpaqueProtocol = false
}
if pp.addresses.Addresses[id].OpaqueProtocol != address.OpaqueProtocol {
pp.addresses.Addresses[id] = address
updated = true
}
}
}
@ -1406,6 +1497,47 @@ func SetToServerProtocol(k8sAPI *k8s.API, address *Address, port Port) error {
return nil
}
// setToServerProtocolExternalWorkload sets the address's OpaqueProtocol field based off any
// Servers that select it and override the expected protocol for ExternalWorkloads.
func setToServerProtocolExternalWorkload(k8sAPI *k8s.API, address *Address, port Port) error {
if address.ExternalWorkload == nil {
return fmt.Errorf("endpoint not backed by ExternalWorkload: %s:%d", address.IP, address.Port)
}
servers, err := k8sAPI.Srv().Lister().Servers("").List(labels.Everything())
if err != nil {
return fmt.Errorf("failed to list Servers: %w", err)
}
for _, server := range servers {
selector, err := metav1.LabelSelectorAsSelector(server.Spec.ExternalWorkloadSelector)
if err != nil {
return fmt.Errorf("failed to create Selector: %w", err)
}
if server.Spec.ProxyProtocol == opaqueProtocol && selector.Matches(labels.Set(address.ExternalWorkload.Labels)) {
var portMatch bool
switch server.Spec.Port.Type {
case intstr.Int:
if server.Spec.Port.IntVal == int32(port) {
portMatch = true
}
case intstr.String:
for _, p := range address.ExternalWorkload.Spec.Ports {
if p.Port == int32(port) && p.Name == server.Spec.Port.StrVal {
portMatch = true
}
}
default:
continue
}
if portMatch {
address.OpaqueProtocol = true
return nil
}
}
}
return nil
}
func latestUpdated(managedFields []metav1.ManagedFieldsEntry) time.Time {
var latest time.Time
for _, field := range managedFields {

View File

@ -1340,6 +1340,418 @@ status:
}
}
func TestEndpointsWatcherWithEndpointSlicesExternalWorkload(t *testing.T) {
for _, tt := range []struct {
serviceType string
k8sConfigs []string
id ServiceID
hostname string
port Port
expectedAddresses []string
expectedNoEndpoints bool
expectedNoEndpointsServiceExists bool
expectedError bool
expectedLocalTrafficPolicy bool
}{
{
serviceType: "local services with EndpointSlice",
k8sConfigs: []string{`
kind: APIResourceList
apiVersion: v1
groupVersion: discovery.k8s.io/v1
resources:
- name: endpointslices
singularName: endpointslice
namespaced: true
kind: EndpointSlice
verbs:
- delete
- deletecollection
- get
- list
- patch
- create
- update
- watch
`, `
apiVersion: v1
kind: Service
metadata:
name: name-1
namespace: ns
spec:
type: LoadBalancer
ports:
- port: 8989
internalTrafficPolicy: Local`,
`
addressType: IPv4
apiVersion: discovery.k8s.io/v1
endpoints:
- addresses:
- 172.17.0.12
conditions:
ready: true
targetRef:
kind: ExternalWorkload
name: name-1-1
namespace: ns
topology:
kubernetes.io/hostname: node-1
- addresses:
- 172.17.0.19
conditions:
ready: true
targetRef:
kind: ExternalWorkload
name: name-1-2
namespace: ns
topology:
kubernetes.io/hostname: node-1
- addresses:
- 172.17.0.20
conditions:
ready: true
targetRef:
kind: ExternalWorkload
name: name-1-3
namespace: ns
topology:
kubernetes.io/hostname: node-2
- addresses:
- 172.17.0.21
conditions:
ready: true
topology:
kubernetes.io/hostname: node-2
kind: EndpointSlice
metadata:
labels:
kubernetes.io/service-name: name-1
name: name-1-bhnqh
namespace: ns
ports:
- name: ""
port: 8989`,
`
apiVersion: workload.linkerd.io/v1alpha1
kind: ExternalWorkload
metadata:
name: name-1-1
namespace: ns
status:
conditions:
ready: true`,
`
apiVersion: workload.linkerd.io/v1alpha1
kind: ExternalWorkload
metadata:
name: name-1-2
namespace: ns
status:
conditions:
ready: true`,
`
apiVersion: workload.linkerd.io/v1alpha1
kind: ExternalWorkload
metadata:
name: name-1-3
namespace: ns
status:
conditions:
ready: true`,
},
id: ExternalWorkloadID{Name: "name-1", Namespace: "ns"},
port: 8989,
expectedAddresses: []string{
"172.17.0.12:8989",
"172.17.0.19:8989",
"172.17.0.20:8989",
"172.17.0.21:8989",
},
expectedNoEndpoints: false,
expectedNoEndpointsServiceExists: false,
expectedError: false,
expectedLocalTrafficPolicy: true,
},
{
serviceType: "local services with missing addresses and EndpointSlice",
k8sConfigs: []string{`
kind: APIResourceList
apiVersion: v1
groupVersion: discovery.k8s.io/v1
resources:
- name: endpointslices
singularName: endpointslice
namespaced: true
kind: EndpointSlice
verbs:
- delete
- deletecollection
- get
- list
- patch
- create
- update
- watch
`, `
apiVersion: v1
kind: Service
metadata:
name: name-1
namespace: ns
spec:
type: LoadBalancer
ports:
- port: 8989`, `
addressType: IPv4
apiVersion: discovery.k8s.io/v1
endpoints:
- addresses:
- 172.17.0.23
conditions:
ready: true
targetRef:
kind: ExternalWorkload
name: name-1-1
namespace: ns
topology:
kubernetes.io/hostname: node-1
- addresses:
- 172.17.0.24
conditions:
ready: true
targetRef:
kind: ExternalWorkload
name: name-1-2
namespace: ns
topology:
kubernetes.io/hostname: node-1
- addresses:
- 172.17.0.25
conditions:
ready: true
targetRef:
kind: ExternalWorkload
name: name-1-3
namespace: ns
topology:
kubernetes.io/hostname: node-2
kind: EndpointSlice
metadata:
labels:
kubernetes.io/service-name: name-1
name: name1-f5fad
namespace: ns
ports:
- name: ""
port: 8989`, `
apiVersion: workload.linkerd.io/v1alpha1
kind: ExternalWorkload
metadata:
name: name-1-3
namespace: ns
status:
conditions:
ready: true`,
},
id: ServiceID{Name: "name-1", Namespace: "ns"},
port: 8989,
expectedAddresses: []string{"172.17.0.25:8989"},
expectedNoEndpoints: false,
expectedNoEndpointsServiceExists: false,
expectedError: false,
},
{
serviceType: "service with EndpointSlice without labels",
k8sConfigs: []string{`
kind: APIResourceList
apiVersion: v1
groupVersion: discovery.k8s.io/v1
resources:
- name: endpointslices
singularName: endpointslice
namespaced: true
kind: EndpointSlice
verbs:
- delete
- deletecollection
- get
- list
- patch
- create
- update
- watch
`, `
apiVersion: v1
kind: Service
metadata:
name: name-5
namespace: ns
spec:
type: LoadBalancer
ports:
- port: 8989`, `
addressType: IPv4
apiVersion: discovery.k8s.io/v1
endpoints:
- addresses:
- 172.17.0.12
conditions:
ready: true
hostname: name-1-1
targetRef:
kind: ExternalWorkload
name: name-1-1
namespace: ns
topology:
kubernetes.io/hostname: node-1
kind: EndpointSlice
metadata:
labels:
name: name-1-f5fad
namespace: ns
ports:
- name: ""
port: 8989`, `
apiVersion: workload.linkerd.io/v1alpha1
kind: ExternalWorkload
metadata:
name: name-1-1
namespace: ns
status:
conditions:
ready: true`,
},
id: ServiceID{Name: "name-5", Namespace: "ns"},
port: 8989,
expectedAddresses: []string{},
expectedNoEndpoints: true,
expectedNoEndpointsServiceExists: true,
expectedError: false,
},
{
serviceType: "service with IPv6 address type EndpointSlice",
k8sConfigs: []string{`
kind: APIResourceList
apiVersion: v1
groupVersion: discovery.k8s.io/v1
resources:
- name: endpointslices
singularName: endpointslice
namespaced: true
kind: EndpointSlice
verbs:
- delete
- deletecollection
- get
- list
- patch
- create
- update
- watch
`, `
apiVersion: v1
kind: Service
metadata:
name: name-5
namespace: ns
spec:
type: LoadBalancer
ports:
- port: 9000`, `
addressType: IPv6
apiVersion: discovery.k8s.io/v1
endpoints:
- addresses:
- 0:0:0:0:0:0:0:1
conditions:
ready: true
targetRef:
kind: ExternalWorkload
name: name-5-1
namespace: ns
topology:
kubernetes.io/hostname: node-1
kind: EndpointSlice
metadata:
labels:
name: name-5-f65dv
namespace: ns
ownerReferences:
- apiVersion: v1
kind: Service
name: name-5
ports:
- name: ""
port: 9000`, `
apiVersion: workload.linkerd.io/v1alpha1
kind: ExternalWorkload
metadata:
name: name-5-1
namespace: ns
status:
conditions:
ready: true`,
},
id: ServiceID{Name: "name-5", Namespace: "ns"},
port: 9000,
expectedAddresses: []string{},
expectedNoEndpoints: true,
expectedNoEndpointsServiceExists: true,
expectedError: false,
},
} {
tt := tt // pin
t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) {
k8sAPI, err := k8s.NewFakeAPI(tt.k8sConfigs...)
if err != nil {
t.Fatalf("NewFakeAPI returned an error: %s", err)
}
metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
if err != nil {
t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
}
watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local")
if err != nil {
t.Fatalf("can't create Endpoints watcher: %s", err)
}
k8sAPI.Sync(nil)
metadataAPI.Sync(nil)
listener := newBufferingEndpointListener()
err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener)
if tt.expectedError && err == nil {
t.Fatal("Expected error but was ok")
}
if !tt.expectedError && err != nil {
t.Fatalf("Expected no error, got [%s]", err)
}
if listener.localTrafficPolicy != tt.expectedLocalTrafficPolicy {
t.Fatalf("Expected localTrafficPolicy [%v], got [%v]", tt.expectedLocalTrafficPolicy, listener.localTrafficPolicy)
}
listener.ExpectAdded(tt.expectedAddresses, t)
if listener.endpointsAreNotCalled() != tt.expectedNoEndpoints {
t.Fatalf("Expected noEndpointsCalled to be [%t], got [%t]",
tt.expectedNoEndpoints, listener.endpointsAreNotCalled())
}
if listener.endpointsDoNotExist() != tt.expectedNoEndpointsServiceExists {
t.Fatalf("Expected noEndpointsExist to be [%t], got [%t]",
tt.expectedNoEndpointsServiceExists, listener.endpointsDoNotExist())
}
})
}
}
func TestEndpointsWatcherDeletion(t *testing.T) {
k8sConfigs := []string{`
apiVersion: v1
@ -1570,7 +1982,7 @@ status:
id: ServiceID{Name: "name1", Namespace: "ns"},
port: 8989,
hostname: "name1-1",
objectToDelete: createTestEndpointSlice(),
objectToDelete: createTestEndpointSlice(consts.PodKind),
hasSliceAccess: true,
noEndpointsCalled: true,
},
@ -1580,7 +1992,7 @@ status:
id: ServiceID{Name: "name1", Namespace: "ns"},
port: 8989,
hostname: "name1-1",
objectToDelete: createTestEndpointSlice(),
objectToDelete: createTestEndpointSlice(consts.PodKind),
hasSliceAccess: true,
noEndpointsCalled: true,
},
@ -1590,7 +2002,177 @@ status:
id: ServiceID{Name: "name1", Namespace: "ns"},
port: 8989,
hostname: "name1-1",
objectToDelete: createTestEndpointSlice(),
objectToDelete: createTestEndpointSlice(consts.PodKind),
hasSliceAccess: true,
noEndpointsCalled: false,
},
} {
tt := tt // pin
t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) {
k8sAPI, err := k8s.NewFakeAPI(tt.k8sConfigs...)
if err != nil {
t.Fatalf("NewFakeAPI returned an error: %s", err)
}
metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
if err != nil {
t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
}
watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local")
if err != nil {
t.Fatalf("can't create Endpoints watcher: %s", err)
}
k8sAPI.Sync(nil)
metadataAPI.Sync(nil)
listener := newBufferingEndpointListener()
err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener)
if err != nil {
t.Fatal(err)
}
watcher.deleteEndpointSlice(tt.objectToDelete)
if listener.endpointsAreNotCalled() != tt.noEndpointsCalled {
t.Fatalf("Expected noEndpointsCalled to be [%t], got [%t]",
tt.noEndpointsCalled, listener.endpointsAreNotCalled())
}
})
}
}
func TestEndpointsWatcherDeletionWithEndpointSlicesExternalWorkload(t *testing.T) {
k8sConfigsWithES := []string{`
kind: APIResourceList
apiVersion: v1
groupVersion: discovery.k8s.io/v1
resources:
- name: endpointslices
singularName: endpointslice
namespaced: true
kind: EndpointSlice
verbs:
- delete
- deletecollection
- get
- list
- patch
- create
- update
- watch
`, `
apiVersion: v1
kind: Service
metadata:
name: name1
namespace: ns
spec:
type: LoadBalancer
ports:
- port: 8989`, `
addressType: IPv4
apiVersion: discovery.k8s.io/v1
endpoints:
- addresses:
- 172.17.0.12
conditions:
ready: true
targetRef:
kind: ExternalWorkload
name: name1-1
namespace: ns
topology:
kubernetes.io/hostname: node-1
kind: EndpointSlice
metadata:
labels:
kubernetes.io/service-name: name1
name: name1-del
namespace: ns
ports:
- name: ""
port: 8989`, `
apiVersion: workload.linkerd.io/v1alpha1
kind: ExternalWorkload
metadata:
name: name1-1
namespace: ns
status:
conditions:
ready: true`}
k8sConfigWithMultipleES := append(k8sConfigsWithES, []string{`
addressType: IPv4
apiVersion: discovery.k8s.io/v1
endpoints:
- addresses:
- 172.17.0.13
conditions:
ready: true
targetRef:
kind: ExternalWorkload
name: name1-2
namespace: ns
topology:
kubernetes.io/hostname: node-1
kind: EndpointSlice
metadata:
labels:
kubernetes.io/service-name: name1
name: name1-live
namespace: ns
ports:
- name: ""
port: 8989`, `apiVersion: workload.linkerd.io/v1alpha1
kind: ExternalWorkload
metadata:
name: name1-2
namespace: ns
status:
conditions:
ready: true`}...)
for _, tt := range []struct {
serviceType string
k8sConfigs []string
id ServiceID
hostname string
port Port
objectToDelete interface{}
deletingServices bool
hasSliceAccess bool
noEndpointsCalled bool
}{
{
serviceType: "can delete an EndpointSlice",
k8sConfigs: k8sConfigsWithES,
id: ServiceID{Name: "name1", Namespace: "ns"},
port: 8989,
hostname: "name1-1",
objectToDelete: createTestEndpointSlice(consts.ExtWorkloadKind),
hasSliceAccess: true,
noEndpointsCalled: true,
},
{
serviceType: "can delete an EndpointSlice when wrapped in a DeletedFinalStateUnknown",
k8sConfigs: k8sConfigsWithES,
id: ServiceID{Name: "name1", Namespace: "ns"},
port: 8989,
hostname: "name1-1",
objectToDelete: createTestEndpointSlice(consts.ExtWorkloadKind),
hasSliceAccess: true,
noEndpointsCalled: true,
},
{
serviceType: "can delete an EndpointSlice when there are multiple ones",
k8sConfigs: k8sConfigWithMultipleES,
id: ServiceID{Name: "name1", Namespace: "ns"},
port: 8989,
hostname: "name1-1",
objectToDelete: createTestEndpointSlice(consts.ExtWorkloadKind),
hasSliceAccess: true,
noEndpointsCalled: false,
},
@ -1927,7 +2509,7 @@ func endpoints(identity string) *corev1.Endpoints {
}
}
func createTestEndpointSlice() *dv1.EndpointSlice {
func createTestEndpointSlice(targetRefKind string) *dv1.EndpointSlice {
return &dv1.EndpointSlice{
AddressType: "IPv4",
ObjectMeta: metav1.ObjectMeta{Name: "name1-del", Namespace: "ns", Labels: map[string]string{dv1.LabelServiceName: "name1"}},
@ -1935,7 +2517,7 @@ func createTestEndpointSlice() *dv1.EndpointSlice {
{
Addresses: []string{"172.17.0.12"},
Conditions: dv1.EndpointConditions{Ready: func(b bool) *bool { return &b }(true)},
TargetRef: &corev1.ObjectReference{Name: "name1-1", Namespace: "ns", Kind: "Pod"},
TargetRef: &corev1.ObjectReference{Name: "name1-1", Namespace: "ns", Kind: targetRefKind},
},
},
Ports: []dv1.EndpointPort{

View File

@ -37,6 +37,8 @@ type (
PodID = ID
// ProfileID is the namespace-qualified name of a service profile.
ProfileID = ID
// PodID is the namespace-qualified name of an ExternalWorkload.
ExternalWorkloadID = ID
// Port is a numeric port.
Port = uint32

View File

@ -107,7 +107,7 @@ func Main(args []string) {
*kubeConfigPath,
true,
"local",
k8s.Endpoint, k8s.ES, k8s.Pod, k8s.Svc, k8s.SP, k8s.Job, k8s.Srv,
k8s.Endpoint, k8s.ES, k8s.Pod, k8s.Svc, k8s.SP, k8s.Job, k8s.Srv, k8s.ExtWorkload,
)
} else {
k8sAPI, err = k8s.InitializeAPI(
@ -115,7 +115,7 @@ func Main(args []string) {
*kubeConfigPath,
true,
"local",
k8s.Endpoint, k8s.Pod, k8s.Svc, k8s.SP, k8s.Job, k8s.Srv,
k8s.Endpoint, k8s.Pod, k8s.Svc, k8s.SP, k8s.Job, k8s.Srv, k8s.ExtWorkload,
)
}
if err != nil {

View File

@ -48,6 +48,7 @@ func NewFakeClusterScopedAPI(clientSet kubernetes.Interface, l5dClientSet l5dcrd
MWC,
NS,
Pod,
ExtWorkload,
RC,
RS,
SP,

View File

@ -55,6 +55,7 @@ const (
ServerKind = "Server"
HTTPRouteKind = "HTTPRoute"
ExtWorkloadKind = "ExternalWorkload"
PodKind = "Pod"
WorkloadAPIGroup = "workload.linkerd.io"
WorkloadAPIVersion = "v1alpha1"

View File

@ -8,6 +8,7 @@ package k8s
import (
"fmt"
ewv1alpha1 "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1alpha1"
"github.com/linkerd/linkerd2/pkg/version"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
@ -513,6 +514,16 @@ func GetPodLabels(ownerKind, ownerName string, pod *corev1.Pod) map[string]strin
return labels
}
// GetExternalWorkloadLabels returns the set of prometheus owner labels for a given ExternalWorkload
func GetExternalWorkloadLabels(ownerKind, ownerName string, ew *ewv1alpha1.ExternalWorkload) map[string]string {
labels := map[string]string{"external_workload": ew.Name}
if ownerKind != "" && ownerName != "" {
labels[ownerKind] = ownerName
}
return labels
}
// IsMeshed returns whether a given Pod is in a given controller's service mesh.
func IsMeshed(pod *corev1.Pod, controllerNS string) bool {
return pod.Labels[ControllerNSLabel] == controllerNS