mirror of https://github.com/linkerd/linkerd2.git
804 lines
23 KiB
Go
804 lines
23 KiB
Go
package destination
|
|
|
|
import (
|
|
"fmt"
|
|
"net/netip"
|
|
"reflect"
|
|
|
|
pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
|
|
"github.com/linkerd/linkerd2-proxy-api/go/net"
|
|
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
|
|
ewv1beta1 "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1beta1"
|
|
"github.com/linkerd/linkerd2/controller/k8s"
|
|
"github.com/linkerd/linkerd2/pkg/addr"
|
|
pkgK8s "github.com/linkerd/linkerd2/pkg/k8s"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
logging "github.com/sirupsen/logrus"
|
|
corev1 "k8s.io/api/core/v1"
|
|
)
|
|
|
|
const (
|
|
defaultWeight uint32 = 10000
|
|
|
|
// inboundListenAddr is the environment variable holding the inbound
|
|
// listening address for the proxy container.
|
|
envInboundListenAddr = "LINKERD2_PROXY_INBOUND_LISTEN_ADDR"
|
|
envAdminListenAddr = "LINKERD2_PROXY_ADMIN_LISTEN_ADDR"
|
|
envControlListenAddr = "LINKERD2_PROXY_CONTROL_LISTEN_ADDR"
|
|
|
|
updateQueueCapacity = 100
|
|
|
|
defaultProxyInboundPort = 4143
|
|
)
|
|
|
|
// endpointTranslator satisfies EndpointUpdateListener and translates updates
|
|
// into Destination.Get messages.
|
|
type (
|
|
endpointTranslator struct {
|
|
controllerNS string
|
|
identityTrustDomain string
|
|
nodeTopologyZone string
|
|
nodeName string
|
|
defaultOpaquePorts map[uint32]struct{}
|
|
|
|
forceOpaqueTransport,
|
|
enableH2Upgrade,
|
|
enableEndpointFiltering,
|
|
enableIPv6,
|
|
|
|
extEndpointZoneWeights bool
|
|
|
|
meshedHTTP2ClientParams *pb.Http2ClientParams
|
|
|
|
availableEndpoints watcher.AddressSet
|
|
filteredSnapshot watcher.AddressSet
|
|
stream pb.Destination_GetServer
|
|
endStream chan struct{}
|
|
log *logging.Entry
|
|
overflowCounter prometheus.Counter
|
|
|
|
updates chan interface{}
|
|
stop chan struct{}
|
|
}
|
|
|
|
addUpdate struct {
|
|
set watcher.AddressSet
|
|
}
|
|
|
|
removeUpdate struct {
|
|
set watcher.AddressSet
|
|
}
|
|
|
|
noEndpointsUpdate struct {
|
|
exists bool
|
|
}
|
|
)
|
|
|
|
var updatesQueueOverflowCounter = promauto.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Name: "endpoint_updates_queue_overflow",
|
|
Help: "A counter incremented whenever the endpoint updates queue overflows",
|
|
},
|
|
[]string{
|
|
"service",
|
|
},
|
|
)
|
|
|
|
func newEndpointTranslator(
|
|
controllerNS string,
|
|
identityTrustDomain string,
|
|
forceOpaqueTransport,
|
|
enableH2Upgrade,
|
|
enableEndpointFiltering,
|
|
enableIPv6,
|
|
extEndpointZoneWeights bool,
|
|
meshedHTTP2ClientParams *pb.Http2ClientParams,
|
|
service string,
|
|
srcNodeName string,
|
|
defaultOpaquePorts map[uint32]struct{},
|
|
k8sAPI *k8s.MetadataAPI,
|
|
stream pb.Destination_GetServer,
|
|
endStream chan struct{},
|
|
log *logging.Entry,
|
|
) *endpointTranslator {
|
|
log = log.WithFields(logging.Fields{
|
|
"component": "endpoint-translator",
|
|
"service": service,
|
|
})
|
|
|
|
nodeTopologyZone, err := getNodeTopologyZone(k8sAPI, srcNodeName)
|
|
if err != nil {
|
|
log.Errorf("Failed to get node topology zone for node %s: %s", srcNodeName, err)
|
|
}
|
|
availableEndpoints := newEmptyAddressSet()
|
|
|
|
filteredSnapshot := newEmptyAddressSet()
|
|
|
|
return &endpointTranslator{
|
|
controllerNS,
|
|
identityTrustDomain,
|
|
nodeTopologyZone,
|
|
srcNodeName,
|
|
defaultOpaquePorts,
|
|
forceOpaqueTransport,
|
|
enableH2Upgrade,
|
|
enableEndpointFiltering,
|
|
enableIPv6,
|
|
extEndpointZoneWeights,
|
|
meshedHTTP2ClientParams,
|
|
|
|
availableEndpoints,
|
|
filteredSnapshot,
|
|
stream,
|
|
endStream,
|
|
log,
|
|
updatesQueueOverflowCounter.With(prometheus.Labels{"service": service}),
|
|
make(chan interface{}, updateQueueCapacity),
|
|
make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
func (et *endpointTranslator) Add(set watcher.AddressSet) {
|
|
et.enqueueUpdate(&addUpdate{set})
|
|
}
|
|
|
|
func (et *endpointTranslator) Remove(set watcher.AddressSet) {
|
|
et.enqueueUpdate(&removeUpdate{set})
|
|
}
|
|
|
|
func (et *endpointTranslator) NoEndpoints(exists bool) {
|
|
et.enqueueUpdate(&noEndpointsUpdate{exists})
|
|
}
|
|
|
|
// Add, Remove, and NoEndpoints are called from a client-go informer callback
|
|
// and therefore must not block. For each of these, we enqueue an update in
|
|
// a channel so that it can be processed asyncronously. To ensure that enqueuing
|
|
// does not block, we first check to see if there is capacity in the buffered
|
|
// channel. If there is not, we drop the update and signal to the stream that
|
|
// it has fallen too far behind and should be closed.
|
|
func (et *endpointTranslator) enqueueUpdate(update interface{}) {
|
|
select {
|
|
case et.updates <- update:
|
|
// Update has been successfully enqueued.
|
|
default:
|
|
// We are unable to enqueue because the channel does not have capacity.
|
|
// The stream has fallen too far behind and should be closed.
|
|
et.overflowCounter.Inc()
|
|
select {
|
|
case <-et.endStream:
|
|
// The endStream channel has already been closed so no action is
|
|
// necessary.
|
|
default:
|
|
et.log.Error("endpoint update queue full; aborting stream")
|
|
close(et.endStream)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Start initiates a goroutine which processes update events off of the
|
|
// endpointTranslator's internal queue and sends to the grpc stream as
|
|
// appropriate. The goroutine calls several non-thread-safe functions (including
|
|
// Send) and therefore, Start must not be called more than once.
|
|
func (et *endpointTranslator) Start() {
|
|
go func() {
|
|
for {
|
|
select {
|
|
case update, ok := <-et.updates:
|
|
if !ok {
|
|
return
|
|
}
|
|
et.processUpdate(update)
|
|
case <-et.stop:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Stop terminates the goroutine started by Start.
|
|
func (et *endpointTranslator) Stop() {
|
|
close(et.stop)
|
|
}
|
|
|
|
// DrainAndStop closes the updates channel, causing the goroutine started by
|
|
// Start to terminate after processing all remaining updates.
|
|
func (et *endpointTranslator) DrainAndStop() {
|
|
close(et.updates)
|
|
}
|
|
|
|
func (et *endpointTranslator) processUpdate(update interface{}) {
|
|
switch update := update.(type) {
|
|
case *addUpdate:
|
|
et.add(update.set)
|
|
case *removeUpdate:
|
|
et.remove(update.set)
|
|
case *noEndpointsUpdate:
|
|
et.noEndpoints(update.exists)
|
|
}
|
|
}
|
|
|
|
func (et *endpointTranslator) add(set watcher.AddressSet) {
|
|
for id, address := range set.Addresses {
|
|
et.availableEndpoints.Addresses[id] = address
|
|
}
|
|
|
|
et.availableEndpoints.Labels = set.Labels
|
|
et.availableEndpoints.LocalTrafficPolicy = set.LocalTrafficPolicy
|
|
|
|
et.sendFilteredUpdate()
|
|
}
|
|
|
|
func (et *endpointTranslator) remove(set watcher.AddressSet) {
|
|
for id := range set.Addresses {
|
|
delete(et.availableEndpoints.Addresses, id)
|
|
}
|
|
|
|
et.sendFilteredUpdate()
|
|
}
|
|
|
|
func (et *endpointTranslator) noEndpoints(exists bool) {
|
|
et.log.Debugf("NoEndpoints(%+v)", exists)
|
|
|
|
et.availableEndpoints.Addresses = map[watcher.ID]watcher.Address{}
|
|
|
|
et.sendFilteredUpdate()
|
|
}
|
|
|
|
func (et *endpointTranslator) sendFilteredUpdate() {
|
|
filtered := et.filterAddresses()
|
|
filtered = et.selectAddressFamily(filtered)
|
|
diffAdd, diffRemove := et.diffEndpoints(filtered)
|
|
|
|
if len(diffAdd.Addresses) > 0 {
|
|
et.sendClientAdd(diffAdd)
|
|
}
|
|
if len(diffRemove.Addresses) > 0 {
|
|
et.sendClientRemove(diffRemove)
|
|
}
|
|
|
|
et.filteredSnapshot = filtered
|
|
}
|
|
|
|
func (et *endpointTranslator) selectAddressFamily(addresses watcher.AddressSet) watcher.AddressSet {
|
|
filtered := make(map[watcher.ID]watcher.Address)
|
|
for id, addr := range addresses.Addresses {
|
|
if id.IPFamily == corev1.IPv6Protocol && !et.enableIPv6 {
|
|
continue
|
|
}
|
|
|
|
if id.IPFamily == corev1.IPv4Protocol && et.enableIPv6 {
|
|
// Only consider IPv4 address for which there's not already an IPv6
|
|
// alternative
|
|
altID := id
|
|
altID.IPFamily = corev1.IPv6Protocol
|
|
if _, ok := addresses.Addresses[altID]; ok {
|
|
continue
|
|
}
|
|
}
|
|
|
|
filtered[id] = addr
|
|
}
|
|
|
|
return watcher.AddressSet{
|
|
Addresses: filtered,
|
|
Labels: addresses.Labels,
|
|
LocalTrafficPolicy: addresses.LocalTrafficPolicy,
|
|
}
|
|
}
|
|
|
|
// filterAddresses is responsible for filtering endpoints based on the node's
|
|
// topology zone. The client will only receive endpoints with the same
|
|
// consumption zone as the node. An endpoints consumption zone is set
|
|
// by its Hints field and can be different than its actual Topology zone.
|
|
// when service.spec.internalTrafficPolicy is set to local, Topology Aware
|
|
// Hints are not used.
|
|
func (et *endpointTranslator) filterAddresses() watcher.AddressSet {
|
|
filtered := make(map[watcher.ID]watcher.Address)
|
|
|
|
// If endpoint filtering is disabled, return all available addresses.
|
|
if !et.enableEndpointFiltering {
|
|
for k, v := range et.availableEndpoints.Addresses {
|
|
filtered[k] = v
|
|
}
|
|
return watcher.AddressSet{
|
|
Addresses: filtered,
|
|
Labels: et.availableEndpoints.Labels,
|
|
}
|
|
}
|
|
|
|
// If service.spec.internalTrafficPolicy is set to local, filter and return the addresses
|
|
// for local node only
|
|
if et.availableEndpoints.LocalTrafficPolicy {
|
|
et.log.Debugf("Filtering through addresses that should be consumed by node %s", et.nodeName)
|
|
for id, address := range et.availableEndpoints.Addresses {
|
|
if address.Pod != nil && address.Pod.Spec.NodeName == et.nodeName {
|
|
filtered[id] = address
|
|
}
|
|
}
|
|
et.log.Debugf("Filtered from %d to %d addresses", len(et.availableEndpoints.Addresses), len(filtered))
|
|
return watcher.AddressSet{
|
|
Addresses: filtered,
|
|
Labels: et.availableEndpoints.Labels,
|
|
LocalTrafficPolicy: et.availableEndpoints.LocalTrafficPolicy,
|
|
}
|
|
}
|
|
// If any address does not have a hint, then all hints are ignored and all
|
|
// available addresses are returned. This replicates kube-proxy behavior
|
|
// documented in the KEP: https://github.com/kubernetes/enhancements/blob/master/keps/sig-network/2433-topology-aware-hints/README.md#kube-proxy
|
|
for _, address := range et.availableEndpoints.Addresses {
|
|
if len(address.ForZones) == 0 {
|
|
for k, v := range et.availableEndpoints.Addresses {
|
|
filtered[k] = v
|
|
}
|
|
et.log.Debugf("Hints not available on endpointslice. Zone Filtering disabled. Falling back to routing to all pods")
|
|
return watcher.AddressSet{
|
|
Addresses: filtered,
|
|
Labels: et.availableEndpoints.Labels,
|
|
LocalTrafficPolicy: et.availableEndpoints.LocalTrafficPolicy,
|
|
}
|
|
}
|
|
}
|
|
|
|
// Each address that has a hint matching the node's zone should be added
|
|
// to the set of addresses that will be returned.
|
|
et.log.Debugf("Filtering through addresses that should be consumed by zone %s", et.nodeTopologyZone)
|
|
for id, address := range et.availableEndpoints.Addresses {
|
|
for _, zone := range address.ForZones {
|
|
if zone.Name == et.nodeTopologyZone {
|
|
filtered[id] = address
|
|
}
|
|
}
|
|
}
|
|
if len(filtered) > 0 {
|
|
et.log.Debugf("Filtered from %d to %d addresses", len(et.availableEndpoints.Addresses), len(filtered))
|
|
return watcher.AddressSet{
|
|
Addresses: filtered,
|
|
Labels: et.availableEndpoints.Labels,
|
|
LocalTrafficPolicy: et.availableEndpoints.LocalTrafficPolicy,
|
|
}
|
|
}
|
|
|
|
// If there were no filtered addresses, then fall to using endpoints from
|
|
// all zones.
|
|
for k, v := range et.availableEndpoints.Addresses {
|
|
filtered[k] = v
|
|
}
|
|
return watcher.AddressSet{
|
|
Addresses: filtered,
|
|
Labels: et.availableEndpoints.Labels,
|
|
LocalTrafficPolicy: et.availableEndpoints.LocalTrafficPolicy,
|
|
}
|
|
}
|
|
|
|
// diffEndpoints calculates the difference between the filtered set of
|
|
// endpoints in the current (Add/Remove) operation and the snapshot of
|
|
// previously filtered endpoints. This diff allows the client to receive only
|
|
// the endpoints that match the topological zone, by adding new endpoints and
|
|
// removing stale ones.
|
|
func (et *endpointTranslator) diffEndpoints(filtered watcher.AddressSet) (watcher.AddressSet, watcher.AddressSet) {
|
|
add := make(map[watcher.ID]watcher.Address)
|
|
remove := make(map[watcher.ID]watcher.Address)
|
|
|
|
for id, new := range filtered.Addresses {
|
|
old, ok := et.filteredSnapshot.Addresses[id]
|
|
if !ok {
|
|
add[id] = new
|
|
} else if !reflect.DeepEqual(old, new) {
|
|
add[id] = new
|
|
}
|
|
}
|
|
|
|
for id, address := range et.filteredSnapshot.Addresses {
|
|
if _, ok := filtered.Addresses[id]; !ok {
|
|
remove[id] = address
|
|
}
|
|
}
|
|
|
|
return watcher.AddressSet{
|
|
Addresses: add,
|
|
Labels: filtered.Labels,
|
|
},
|
|
watcher.AddressSet{
|
|
Addresses: remove,
|
|
Labels: filtered.Labels,
|
|
}
|
|
}
|
|
|
|
func (et *endpointTranslator) sendClientAdd(set watcher.AddressSet) {
|
|
addrs := []*pb.WeightedAddr{}
|
|
for _, address := range set.Addresses {
|
|
var (
|
|
wa *pb.WeightedAddr
|
|
opaquePorts map[uint32]struct{}
|
|
err error
|
|
)
|
|
if address.Pod != nil {
|
|
opaquePorts = watcher.GetAnnotatedOpaquePorts(address.Pod, et.defaultOpaquePorts)
|
|
wa, err = createWeightedAddr(address, opaquePorts,
|
|
et.forceOpaqueTransport, et.enableH2Upgrade, et.identityTrustDomain, et.controllerNS, et.meshedHTTP2ClientParams)
|
|
if err != nil {
|
|
et.log.Errorf("Failed to translate Pod endpoints to weighted addr: %s", err)
|
|
continue
|
|
}
|
|
} else if address.ExternalWorkload != nil {
|
|
opaquePorts = watcher.GetAnnotatedOpaquePortsForExternalWorkload(address.ExternalWorkload, et.defaultOpaquePorts)
|
|
wa, err = createWeightedAddrForExternalWorkload(address, et.forceOpaqueTransport, opaquePorts, et.meshedHTTP2ClientParams)
|
|
if err != nil {
|
|
et.log.Errorf("Failed to translate ExternalWorkload endpoints to weighted addr: %s", err)
|
|
continue
|
|
}
|
|
} else {
|
|
// When there's no associated pod, we may still need to set metadata
|
|
// (especially for remote multi-cluster services).
|
|
var addr *net.TcpAddress
|
|
addr, err = toAddr(address)
|
|
if err != nil {
|
|
et.log.Errorf("Failed to translate endpoints to weighted addr: %s", err)
|
|
continue
|
|
}
|
|
|
|
var authOverride *pb.AuthorityOverride
|
|
if address.AuthorityOverride != "" {
|
|
authOverride = &pb.AuthorityOverride{
|
|
AuthorityOverride: address.AuthorityOverride,
|
|
}
|
|
}
|
|
wa = &pb.WeightedAddr{
|
|
Addr: addr,
|
|
Weight: defaultWeight,
|
|
AuthorityOverride: authOverride,
|
|
MetricLabels: map[string]string{},
|
|
}
|
|
|
|
if address.Identity != "" {
|
|
wa.TlsIdentity = &pb.TlsIdentity{
|
|
Strategy: &pb.TlsIdentity_DnsLikeIdentity_{
|
|
DnsLikeIdentity: &pb.TlsIdentity_DnsLikeIdentity{
|
|
Name: address.Identity,
|
|
},
|
|
},
|
|
}
|
|
if et.enableH2Upgrade {
|
|
wa.ProtocolHint = &pb.ProtocolHint{
|
|
Protocol: &pb.ProtocolHint_H2_{
|
|
H2: &pb.ProtocolHint_H2{},
|
|
},
|
|
}
|
|
}
|
|
wa.Http2 = et.meshedHTTP2ClientParams
|
|
}
|
|
}
|
|
|
|
if et.nodeTopologyZone != "" && address.Zone != nil {
|
|
if *address.Zone == et.nodeTopologyZone {
|
|
wa.MetricLabels["zone_locality"] = "local"
|
|
|
|
if et.extEndpointZoneWeights {
|
|
// EXPERIMENTAL: Use the endpoint weight field to indicate zonal
|
|
// preference so that local endoints are more heavily weighted.
|
|
wa.Weight *= 10
|
|
}
|
|
} else {
|
|
wa.MetricLabels["zone_locality"] = "remote"
|
|
}
|
|
} else {
|
|
wa.MetricLabels["zone_locality"] = "unknown"
|
|
}
|
|
|
|
addrs = append(addrs, wa)
|
|
}
|
|
|
|
add := &pb.Update{Update: &pb.Update_Add{
|
|
Add: &pb.WeightedAddrSet{
|
|
Addrs: addrs,
|
|
MetricLabels: set.Labels,
|
|
},
|
|
}}
|
|
|
|
et.log.Debugf("Sending destination add: %+v", add)
|
|
if err := et.stream.Send(add); err != nil {
|
|
et.log.Debugf("Failed to send address update: %s", err)
|
|
}
|
|
}
|
|
|
|
func (et *endpointTranslator) sendClientRemove(set watcher.AddressSet) {
|
|
addrs := []*net.TcpAddress{}
|
|
for _, address := range set.Addresses {
|
|
tcpAddr, err := toAddr(address)
|
|
if err != nil {
|
|
et.log.Errorf("Failed to translate endpoints to addr: %s", err)
|
|
continue
|
|
}
|
|
addrs = append(addrs, tcpAddr)
|
|
}
|
|
|
|
remove := &pb.Update{Update: &pb.Update_Remove{
|
|
Remove: &pb.AddrSet{
|
|
Addrs: addrs,
|
|
},
|
|
}}
|
|
|
|
et.log.Debugf("Sending destination remove: %+v", remove)
|
|
if err := et.stream.Send(remove); err != nil {
|
|
et.log.Debugf("Failed to send address update: %s", err)
|
|
}
|
|
}
|
|
|
|
func toAddr(address watcher.Address) (*net.TcpAddress, error) {
|
|
ip, err := addr.ParseProxyIP(address.IP)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &net.TcpAddress{
|
|
Ip: ip,
|
|
Port: address.Port,
|
|
}, nil
|
|
}
|
|
|
|
func createWeightedAddrForExternalWorkload(
|
|
address watcher.Address,
|
|
forceOpaqueTransport bool,
|
|
opaquePorts map[uint32]struct{},
|
|
http2 *pb.Http2ClientParams,
|
|
) (*pb.WeightedAddr, error) {
|
|
tcpAddr, err := toAddr(address)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
weightedAddr := pb.WeightedAddr{
|
|
Addr: tcpAddr,
|
|
Weight: defaultWeight,
|
|
MetricLabels: map[string]string{},
|
|
}
|
|
|
|
weightedAddr.MetricLabels = pkgK8s.GetExternalWorkloadLabels(address.OwnerKind, address.OwnerName, address.ExternalWorkload)
|
|
// If the address is not backed by an ExternalWorkload, there is no additional metadata
|
|
// to add.
|
|
if address.ExternalWorkload == nil {
|
|
return &weightedAddr, nil
|
|
}
|
|
|
|
weightedAddr.ProtocolHint = &pb.ProtocolHint{}
|
|
weightedAddr.Http2 = http2
|
|
|
|
_, opaquePort := opaquePorts[address.Port]
|
|
opaquePort = opaquePort || address.OpaqueProtocol
|
|
|
|
if forceOpaqueTransport || opaquePort {
|
|
port := getInboundPortFromExternalWorkload(&address.ExternalWorkload.Spec)
|
|
weightedAddr.ProtocolHint.OpaqueTransport = &pb.ProtocolHint_OpaqueTransport{InboundPort: port}
|
|
}
|
|
|
|
// If address is set as opaque by a Server, or its port is set as
|
|
// opaque by annotation or default value, then set the hinted protocol to
|
|
// Opaque.
|
|
if opaquePort {
|
|
weightedAddr.ProtocolHint.Protocol = &pb.ProtocolHint_Opaque_{
|
|
Opaque: &pb.ProtocolHint_Opaque{},
|
|
}
|
|
} else {
|
|
weightedAddr.ProtocolHint.Protocol = &pb.ProtocolHint_H2_{
|
|
H2: &pb.ProtocolHint_H2{},
|
|
}
|
|
}
|
|
|
|
// we assume external workloads support only SPIRE identity
|
|
weightedAddr.TlsIdentity = &pb.TlsIdentity{
|
|
Strategy: &pb.TlsIdentity_UriLikeIdentity_{
|
|
UriLikeIdentity: &pb.TlsIdentity_UriLikeIdentity{
|
|
Uri: address.ExternalWorkload.Spec.MeshTLS.Identity,
|
|
},
|
|
},
|
|
ServerName: &pb.TlsIdentity_DnsLikeIdentity{
|
|
Name: address.ExternalWorkload.Spec.MeshTLS.ServerName,
|
|
},
|
|
}
|
|
|
|
weightedAddr.MetricLabels = pkgK8s.GetExternalWorkloadLabels(address.OwnerKind, address.OwnerName, address.ExternalWorkload)
|
|
// Set a zone label, even if it is empty (for consistency).
|
|
z := ""
|
|
if address.Zone != nil {
|
|
z = *address.Zone
|
|
}
|
|
weightedAddr.MetricLabels["zone"] = z
|
|
|
|
return &weightedAddr, nil
|
|
}
|
|
|
|
func createWeightedAddr(
|
|
address watcher.Address,
|
|
opaquePorts map[uint32]struct{},
|
|
forceOpaqueTransport bool,
|
|
enableH2Upgrade bool,
|
|
identityTrustDomain string,
|
|
controllerNS string,
|
|
meshedHttp2 *pb.Http2ClientParams,
|
|
) (*pb.WeightedAddr, error) {
|
|
tcpAddr, err := toAddr(address)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
weightedAddr := pb.WeightedAddr{
|
|
Addr: tcpAddr,
|
|
Weight: defaultWeight,
|
|
MetricLabels: map[string]string{},
|
|
}
|
|
|
|
// If the address is not backed by a pod, there is no additional metadata
|
|
// to add.
|
|
if address.Pod == nil {
|
|
return &weightedAddr, nil
|
|
}
|
|
|
|
skippedInboundPorts := getPodSkippedInboundPortsAnnotations(address.Pod)
|
|
|
|
controllerNSLabel := address.Pod.Labels[pkgK8s.ControllerNSLabel]
|
|
sa, ns := pkgK8s.GetServiceAccountAndNS(address.Pod)
|
|
weightedAddr.MetricLabels = pkgK8s.GetPodLabels(address.OwnerKind, address.OwnerName, address.Pod)
|
|
|
|
// Set a zone label, even if it is empty (for consistency).
|
|
z := ""
|
|
if address.Zone != nil {
|
|
z = *address.Zone
|
|
}
|
|
weightedAddr.MetricLabels["zone"] = z
|
|
|
|
_, isSkippedInboundPort := skippedInboundPorts[address.Port]
|
|
|
|
if controllerNSLabel != "" && !isSkippedInboundPort {
|
|
weightedAddr.Http2 = meshedHttp2
|
|
weightedAddr.ProtocolHint = &pb.ProtocolHint{}
|
|
|
|
metaPorts, err := getPodMetaPorts(&address.Pod.Spec)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read pod meta ports: %w", err)
|
|
}
|
|
|
|
_, opaquePort := opaquePorts[address.Port]
|
|
opaquePort = opaquePort || address.OpaqueProtocol
|
|
_, isMetaPort := metaPorts[address.Port]
|
|
|
|
if !isMetaPort && (forceOpaqueTransport || opaquePort) {
|
|
port, err := getInboundPort(&address.Pod.Spec)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read inbound port: %w", err)
|
|
}
|
|
weightedAddr.ProtocolHint.OpaqueTransport = &pb.ProtocolHint_OpaqueTransport{InboundPort: port}
|
|
}
|
|
|
|
// If address is set as opaque by a Server, or its port is set as
|
|
// opaque by annotation or default value, then set the hinted protocol to
|
|
// Opaque.
|
|
if opaquePort {
|
|
weightedAddr.ProtocolHint.Protocol = &pb.ProtocolHint_Opaque_{
|
|
Opaque: &pb.ProtocolHint_Opaque{},
|
|
}
|
|
} else if enableH2Upgrade {
|
|
// If the pod is controlled by any Linkerd control plane, then it can be
|
|
// hinted that this destination knows H2 (and handles our orig-proto
|
|
// translation)
|
|
weightedAddr.ProtocolHint.Protocol = &pb.ProtocolHint_H2_{
|
|
H2: &pb.ProtocolHint_H2{},
|
|
}
|
|
}
|
|
}
|
|
|
|
// If the pod is controlled by the same Linkerd control plane, then it can
|
|
// participate in identity with peers.
|
|
//
|
|
// TODO this should be relaxed to match a trust domain annotation so that
|
|
// multiple meshes can participate in identity if they share trust roots.
|
|
if identityTrustDomain != "" &&
|
|
controllerNSLabel == controllerNS &&
|
|
!isSkippedInboundPort {
|
|
|
|
id := fmt.Sprintf("%s.%s.serviceaccount.identity.%s.%s", sa, ns, controllerNSLabel, identityTrustDomain)
|
|
tlsId := &pb.TlsIdentity_DnsLikeIdentity{Name: id}
|
|
|
|
weightedAddr.TlsIdentity = &pb.TlsIdentity{
|
|
Strategy: &pb.TlsIdentity_DnsLikeIdentity_{
|
|
DnsLikeIdentity: tlsId,
|
|
},
|
|
ServerName: tlsId,
|
|
}
|
|
}
|
|
|
|
return &weightedAddr, nil
|
|
}
|
|
|
|
func getNodeTopologyZone(k8sAPI *k8s.MetadataAPI, srcNode string) (string, error) {
|
|
node, err := k8sAPI.Get(k8s.Node, srcNode)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if zone, ok := node.Labels[corev1.LabelTopologyZone]; ok {
|
|
return zone, nil
|
|
}
|
|
return "", nil
|
|
}
|
|
|
|
func newEmptyAddressSet() watcher.AddressSet {
|
|
return watcher.AddressSet{
|
|
Addresses: make(map[watcher.ID]watcher.Address),
|
|
Labels: make(map[string]string),
|
|
}
|
|
}
|
|
|
|
// getInboundPort gets the inbound port from the proxy container's environment
|
|
// variable.
|
|
func getInboundPort(podSpec *corev1.PodSpec) (uint32, error) {
|
|
ports, err := getPodPorts(podSpec, map[string]struct{}{envInboundListenAddr: {}})
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
port := ports[envInboundListenAddr]
|
|
if port == 0 {
|
|
return 0, fmt.Errorf("failed to find inbound port in %s", envInboundListenAddr)
|
|
}
|
|
return port, nil
|
|
}
|
|
|
|
func getPodMetaPorts(podSpec *corev1.PodSpec) (map[uint32]struct{}, error) {
|
|
ports, err := getPodPorts(podSpec, map[string]struct{}{
|
|
envAdminListenAddr: {},
|
|
envControlListenAddr: {},
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
invertedPorts := map[uint32]struct{}{}
|
|
for _, port := range ports {
|
|
invertedPorts[port] = struct{}{}
|
|
}
|
|
return invertedPorts, nil
|
|
}
|
|
|
|
func getPodPorts(podSpec *corev1.PodSpec, addrEnvNames map[string]struct{}) (map[string]uint32, error) {
|
|
containers := append(podSpec.InitContainers, podSpec.Containers...)
|
|
for _, containerSpec := range containers {
|
|
ports := map[string]uint32{}
|
|
if containerSpec.Name != pkgK8s.ProxyContainerName {
|
|
continue
|
|
}
|
|
for _, envVar := range containerSpec.Env {
|
|
_, hasEnv := addrEnvNames[envVar.Name]
|
|
if !hasEnv {
|
|
continue
|
|
}
|
|
addrPort, err := netip.ParseAddrPort(envVar.Value)
|
|
if err != nil {
|
|
return map[string]uint32{}, fmt.Errorf("failed to parse inbound port for proxy container: %w", err)
|
|
}
|
|
|
|
ports[envVar.Name] = uint32(addrPort.Port())
|
|
}
|
|
if len(ports) != len(addrEnvNames) {
|
|
missingEnv := []string{}
|
|
for env := range ports {
|
|
_, hasEnv := addrEnvNames[env]
|
|
if !hasEnv {
|
|
missingEnv = append(missingEnv, env)
|
|
}
|
|
}
|
|
return map[string]uint32{}, fmt.Errorf("failed to find %s environment variables in proxy container", missingEnv)
|
|
}
|
|
return ports, nil
|
|
}
|
|
return map[string]uint32{}, fmt.Errorf("failed to find %s environment variables in any container for given pod spec", addrEnvNames)
|
|
}
|
|
|
|
// getInboundPortFromExternalWorkload gets the inbound port from the ExternalWorkload spec
|
|
// variable.
|
|
func getInboundPortFromExternalWorkload(ewSpec *ewv1beta1.ExternalWorkloadSpec) uint32 {
|
|
for _, p := range ewSpec.Ports {
|
|
if p.Name == pkgK8s.ProxyPortName {
|
|
return uint32(p.Port)
|
|
}
|
|
}
|
|
|
|
return defaultProxyInboundPort
|
|
}
|