destination: Add optional experimental endpoint weighting (#11795)

This change adds a runtime flag to the destination controller,
--experimental-endpoint-zone-weights=true, that causes endpoints in the
local zone to receive higher weights. This feature is disabled by
default, since the weight value is not honored by proxies. No helm
configuration is exposed yet, either.

This weighting is instrumented in the endpoint translator. Tests are
added to confirm that the behavior is feature-gated.

Additionally, this PR adds the "zone" metric label to endpoint metadata
responses.
This commit is contained in:
Oliver Gould 2023-12-20 13:11:30 -08:00 committed by GitHub
parent e59ae0ff3b
commit 5e558ae3e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 191 additions and 89 deletions

View File

@ -20,6 +20,7 @@ import (
const (
defaultWeight uint32 = 10000
// inboundListenAddr is the environment variable holding the inbound
// listening address for the proxy container.
envInboundListenAddr = "LINKERD2_PROXY_INBOUND_LISTEN_ADDR"
@ -31,13 +32,15 @@ const (
// into Destination.Get messages.
type (
endpointTranslator struct {
controllerNS string
identityTrustDomain string
enableH2Upgrade bool
nodeTopologyZone string
nodeName string
defaultOpaquePorts map[uint32]struct{}
enableEndpointFiltering bool
controllerNS string
identityTrustDomain string
nodeTopologyZone string
nodeName string
defaultOpaquePorts map[uint32]struct{}
enableH2Upgrade,
enableEndpointFiltering,
experimentalEndpointZoneWeights bool
availableEndpoints watcher.AddressSet
filteredSnapshot watcher.AddressSet
@ -76,11 +79,12 @@ var updatesQueueOverflowCounter = promauto.NewCounterVec(
func newEndpointTranslator(
controllerNS string,
identityTrustDomain string,
enableH2Upgrade bool,
enableH2Upgrade,
enableEndpointFiltering,
experimentalEndpointZoneWeights bool,
service string,
srcNodeName string,
defaultOpaquePorts map[uint32]struct{},
enableEndpointFiltering bool,
k8sAPI *k8s.MetadataAPI,
stream pb.Destination_GetServer,
endStream chan struct{},
@ -102,11 +106,12 @@ func newEndpointTranslator(
return &endpointTranslator{
controllerNS,
identityTrustDomain,
enableH2Upgrade,
nodeTopologyZone,
srcNodeName,
defaultOpaquePorts,
enableH2Upgrade,
enableEndpointFiltering,
experimentalEndpointZoneWeights,
availableEndpoints,
filteredSnapshot,
stream,
@ -373,17 +378,26 @@ func (et *endpointTranslator) sendClientAdd(set watcher.AddressSet) {
if address.Pod != nil {
opaquePorts = watcher.GetAnnotatedOpaquePorts(address.Pod, et.defaultOpaquePorts)
wa, err = createWeightedAddr(address, opaquePorts, et.enableH2Upgrade, et.identityTrustDomain, et.controllerNS, et.log)
if err != nil {
et.log.Errorf("Failed to translate endpoints to weighted addr: %s", err)
continue
}
} else {
// When there's no associated pod, we may still need to set metadata
// (especially for remote multi-cluster services).
var addr *net.TcpAddress
addr, err = toAddr(address)
if err != nil {
et.log.Errorf("Failed to translate endpoints to weighted addr: %s", err)
continue
}
var authOverride *pb.AuthorityOverride
if address.AuthorityOverride != "" {
authOverride = &pb.AuthorityOverride{
AuthorityOverride: address.AuthorityOverride,
}
}
// handling address with no associated pod
var addr *net.TcpAddress
addr, err = toAddr(address)
wa = &pb.WeightedAddr{
Addr: addr,
Weight: defaultWeight,
@ -398,7 +412,6 @@ func (et *endpointTranslator) sendClientAdd(set watcher.AddressSet) {
},
},
}
// in this case we most likely have a proxy on the other side, so set protocol hint as well.
if et.enableH2Upgrade {
wa.ProtocolHint = &pb.ProtocolHint{
Protocol: &pb.ProtocolHint_H2_{
@ -408,10 +421,15 @@ func (et *endpointTranslator) sendClientAdd(set watcher.AddressSet) {
}
}
}
if err != nil {
et.log.Errorf("Failed to translate endpoints to weighted addr: %s", err)
continue
if et.experimentalEndpointZoneWeights {
// EXPERIMENTAL: Use the endpoint weight field to indicate zonal
// preference so that local endoints are more heavily weighted.
if et.nodeTopologyZone != "" && address.Zone != nil && *address.Zone == et.nodeTopologyZone {
wa.Weight *= 10
}
}
addrs = append(addrs, wa)
}
@ -462,8 +480,14 @@ func toAddr(address watcher.Address) (*net.TcpAddress, error) {
}, nil
}
func createWeightedAddr(address watcher.Address, opaquePorts map[uint32]struct{}, enableH2Upgrade bool, identityTrustDomain string, controllerNS string, log *logging.Entry) (*pb.WeightedAddr, error) {
func createWeightedAddr(
address watcher.Address,
opaquePorts map[uint32]struct{},
enableH2Upgrade bool,
identityTrustDomain string,
controllerNS string,
log *logging.Entry,
) (*pb.WeightedAddr, error) {
tcpAddr, err := toAddr(address)
if err != nil {
return nil, err
@ -489,6 +513,14 @@ func createWeightedAddr(address watcher.Address, opaquePorts map[uint32]struct{}
controllerNSLabel := address.Pod.Labels[pkgK8s.ControllerNSLabel]
sa, ns := pkgK8s.GetServiceAccountAndNS(address.Pod)
weightedAddr.MetricLabels = pkgK8s.GetPodLabels(address.OwnerKind, address.OwnerName, address.Pod)
// Set a zone label, even if it is empty (for consistency).
z := ""
if address.Zone != nil {
z = *address.Zone
}
weightedAddr.MetricLabels["zone"] = z
_, isSkippedInboundPort := skippedInboundPorts[address.Port]
// If the pod is controlled by any Linkerd control plane, then it can be

View File

@ -346,8 +346,8 @@ func TestEndpointTranslatorForPods(t *testing.T) {
sort.Slice(addressesAdded, func(i, j int) bool {
return addressesAdded[i].GetAddr().Port < addressesAdded[j].GetAddr().Port
})
checkAddressAndWeight(t, addressesAdded[0], pod1)
checkAddressAndWeight(t, addressesAdded[1], pod2)
checkAddressAndWeight(t, addressesAdded[0], pod1, defaultWeight)
checkAddressAndWeight(t, addressesAdded[1], pod2, defaultWeight)
checkAddress(t, addressesRemoved[0], pod3)
})
@ -372,6 +372,7 @@ func TestEndpointTranslatorForPods(t *testing.T) {
"replicationcontroller": "rc-name",
"serviceaccount": "serviceaccount-name",
"control_plane_ns": "linkerd",
"zone": "",
}
if diff := deep.Equal(actualAddedAddress1MetricLabels, expectedAddedAddress1MetricLabels); diff != nil {
t.Fatalf("Expected global metric labels sent to be [%v] but was [%v]", expectedAddedAddress1MetricLabels, actualAddedAddress1MetricLabels)
@ -428,7 +429,7 @@ func TestEndpointTranslatorForPods(t *testing.T) {
})
}
func TestEndpointTranslatorForZonedAddresses(t *testing.T) {
func TestEndpointTranslatorTopologyAwareFilter(t *testing.T) {
t.Run("Sends one update for add and none for remove", func(t *testing.T) {
mockGetServer, translator := makeEndpointTranslator(t)
translator.Start()
@ -449,6 +450,59 @@ func TestEndpointTranslatorForZonedAddresses(t *testing.T) {
})
}
func TestEndpointTranslatorExperimentalZoneWeights(t *testing.T) {
zoneA := "west-1a"
zoneB := "west-1b"
addrA := watcher.Address{
IP: "7.9.7.9",
Port: 7979,
Zone: &zoneA,
}
addrB := watcher.Address{
IP: "9.7.9.7",
Port: 9797,
Zone: &zoneB,
}
t.Run("Disabled", func(t *testing.T) {
mockGetServer, translator := makeEndpointTranslator(t)
translator.experimentalEndpointZoneWeights = false
translator.Start()
defer translator.Stop()
translator.Add(mkAddressSetForServices(addrA, addrB))
addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
if len(addrs) != 2 {
t.Fatalf("Expected [2] addresses returned, got %v", addrs)
}
sort.Slice(addrs, func(i, j int) bool {
return addrs[i].GetAddr().Port < addrs[j].GetAddr().Port
})
checkAddressAndWeight(t, addrs[0], addrA, defaultWeight)
checkAddressAndWeight(t, addrs[1], addrB, defaultWeight)
})
t.Run("Applies weights", func(t *testing.T) {
mockGetServer, translator := makeEndpointTranslator(t)
translator.experimentalEndpointZoneWeights = true
translator.Start()
defer translator.Stop()
translator.Add(mkAddressSetForServices(addrA, addrB))
addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
if len(addrs) != 2 {
t.Fatalf("Expected [2] addresses returned, got %v", addrs)
}
sort.Slice(addrs, func(i, j int) bool {
return addrs[i].GetAddr().Port < addrs[j].GetAddr().Port
})
checkAddressAndWeight(t, addrs[0], addrA, defaultWeight*10)
checkAddressAndWeight(t, addrs[1], addrB, defaultWeight)
})
}
func TestEndpointTranslatorForLocalTrafficPolicy(t *testing.T) {
t.Run("Sends one update for add and none for remove", func(t *testing.T) {
mockGetServer, translator := makeEndpointTranslator(t)
@ -521,14 +575,18 @@ func mkAddressSetForPods(podAddresses ...watcher.Address) watcher.AddressSet {
return set
}
func checkAddressAndWeight(t *testing.T, actual *pb.WeightedAddr, expected watcher.Address) {
func checkAddressAndWeight(t *testing.T, actual *pb.WeightedAddr, expected watcher.Address, weight uint32) {
t.Helper()
checkAddress(t, actual.GetAddr(), expected)
if actual.GetWeight() != defaultWeight {
t.Fatalf("Expected weight [%+v] but got [%+v]", defaultWeight, actual.GetWeight())
if actual.GetWeight() != weight {
t.Fatalf("Expected weight [%+v] but got [%+v]", weight, actual.GetWeight())
}
}
func checkAddress(t *testing.T, actual *net.TcpAddress, expected watcher.Address) {
t.Helper()
expectedAddr, err := addr.ParseProxyIPV4(expected.IP)
expectedTCP := net.TcpAddress{
Ip: expectedAddr,

View File

@ -25,22 +25,29 @@ import (
)
type (
Config struct {
ControllerNS,
IdentityTrustDomain,
ClusterDomain string
EnableH2Upgrade,
EnableEndpointSlices,
ExperimentalEndpointZoneWeights bool
DefaultOpaquePorts map[uint32]struct{}
}
server struct {
pb.UnimplementedDestinationServer
pods *watcher.PodWatcher
endpoints *watcher.EndpointsWatcher
opaquePorts *watcher.OpaquePortsWatcher
profiles *watcher.ProfileWatcher
config Config
pods *watcher.PodWatcher
endpoints *watcher.EndpointsWatcher
opaquePorts *watcher.OpaquePortsWatcher
profiles *watcher.ProfileWatcher
clusterStore *watcher.ClusterStore
enableH2Upgrade bool
controllerNS string
identityTrustDomain string
clusterDomain string
defaultOpaquePorts map[uint32]struct{}
k8sAPI *k8s.API
metadataAPI *k8s.MetadataAPI
log *logging.Entry
@ -62,15 +69,10 @@ type (
// API.
func NewServer(
addr string,
controllerNS string,
identityTrustDomain string,
enableH2Upgrade bool,
enableEndpointSlices bool,
config Config,
k8sAPI *k8s.API,
metadataAPI *k8s.MetadataAPI,
clusterStore *watcher.ClusterStore,
clusterDomain string,
defaultOpaquePorts map[uint32]struct{},
shutdown <-chan struct{},
) (*grpc.Server, error) {
log := logging.WithFields(logging.Fields{
@ -84,15 +86,15 @@ func NewServer(
return nil, err
}
pods, err := watcher.NewPodWatcher(k8sAPI, metadataAPI, log, defaultOpaquePorts)
pods, err := watcher.NewPodWatcher(k8sAPI, metadataAPI, log, config.DefaultOpaquePorts)
if err != nil {
return nil, err
}
endpoints, err := watcher.NewEndpointsWatcher(k8sAPI, metadataAPI, log, enableEndpointSlices, "local")
endpoints, err := watcher.NewEndpointsWatcher(k8sAPI, metadataAPI, log, config.EnableEndpointSlices, "local")
if err != nil {
return nil, err
}
opaquePorts, err := watcher.NewOpaquePortsWatcher(k8sAPI, log, defaultOpaquePorts)
opaquePorts, err := watcher.NewOpaquePortsWatcher(k8sAPI, log, config.DefaultOpaquePorts)
if err != nil {
return nil, err
}
@ -103,16 +105,12 @@ func NewServer(
srv := server{
pb.UnimplementedDestinationServer{},
config,
pods,
endpoints,
opaquePorts,
profiles,
clusterStore,
enableH2Upgrade,
controllerNS,
identityTrustDomain,
clusterDomain,
defaultOpaquePorts,
k8sAPI,
metadataAPI,
log,
@ -155,7 +153,7 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e
return status.Errorf(codes.InvalidArgument, "IP queries not supported by Get API: host=%s", host)
}
service, instanceID, err := parseK8sServiceName(host, s.clusterDomain)
service, instanceID, err := parseK8sServiceName(host, s.config.ClusterDomain)
if err != nil {
log.Debugf("Invalid service %s", dest.GetPath())
return status.Errorf(codes.InvalidArgument, "Invalid authority: %s", dest.GetPath())
@ -184,13 +182,14 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e
return status.Errorf(codes.NotFound, "Remote cluster not found: %s", cluster)
}
translator := newEndpointTranslator(
s.controllerNS,
s.config.ControllerNS,
remoteConfig.TrustDomain,
s.enableH2Upgrade,
s.config.EnableH2Upgrade,
false, // Disable endpoint filtering for remote discovery.
s.config.ExperimentalEndpointZoneWeights,
fmt.Sprintf("%s.%s.svc.%s:%d", remoteSvc, service.Namespace, remoteConfig.ClusterDomain, port),
token.NodeName,
s.defaultOpaquePorts,
false, // Disable endpoint filtering for remote discovery.
s.config.DefaultOpaquePorts,
s.metadataAPI,
stream,
streamEnd,
@ -214,13 +213,14 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e
} else {
// Local discovery
translator := newEndpointTranslator(
s.controllerNS,
s.identityTrustDomain,
s.enableH2Upgrade,
s.config.ControllerNS,
s.config.IdentityTrustDomain,
s.config.EnableH2Upgrade,
true,
s.config.ExperimentalEndpointZoneWeights,
dest.GetPath(),
token.NodeName,
s.defaultOpaquePorts,
true,
s.config.DefaultOpaquePorts,
s.metadataAPI,
stream,
streamEnd,
@ -301,7 +301,7 @@ func (s *server) getProfileByIP(
return s.subscribeToEndpointProfile(nil, "", ip.String(), port, log, stream)
}
fqn := fmt.Sprintf("%s.%s.svc.%s", svcID.Name, svcID.Namespace, s.clusterDomain)
fqn := fmt.Sprintf("%s.%s.svc.%s", svcID.Name, svcID.Namespace, s.config.ClusterDomain)
return s.subscribeToServiceProfile(*svcID, token, fqn, port, log, stream)
}
@ -312,7 +312,7 @@ func (s *server) getProfileByName(
log *logging.Entry,
stream pb.Destination_GetProfileServer,
) error {
service, hostname, err := parseK8sServiceName(host, s.clusterDomain)
service, hostname, err := parseK8sServiceName(host, s.config.ClusterDomain)
if err != nil {
s.log.Debugf("Invalid service %s", host)
return status.Errorf(codes.InvalidArgument, "invalid service %q: %q", host, err)
@ -407,7 +407,7 @@ func (s *server) subscribeToServicesWithContext(
// The backup lookup ignores the context token to lookup any
// server-namespace-hosted profiles.
backupID, err := profileID(fqn, contextToken{}, s.clusterDomain)
backupID, err := profileID(fqn, contextToken{}, s.config.ClusterDomain)
if err != nil {
log.Debug("Invalid service")
return status.Errorf(codes.InvalidArgument, "invalid profile ID: %s", err)
@ -419,7 +419,7 @@ func (s *server) subscribeToServicesWithContext(
}
defer s.profiles.Unsubscribe(backupID, backup)
primaryID, err := profileID(fqn, token, s.clusterDomain)
primaryID, err := profileID(fqn, token, s.config.ClusterDomain)
if err != nil {
log.Debug("Invalid service")
return status.Errorf(codes.InvalidArgument, "invalid profile ID: %s", err)
@ -450,7 +450,7 @@ func (s *server) subscribeToServiceWithoutContext(
log *logging.Entry,
streamEnd <-chan struct{},
) error {
id, err := profileID(fqn, contextToken{}, s.clusterDomain)
id, err := profileID(fqn, contextToken{}, s.config.ClusterDomain)
if err != nil {
log.Debug("Invalid service")
return status.Errorf(codes.InvalidArgument, "invalid profile ID: %s", err)
@ -485,10 +485,10 @@ func (s *server) subscribeToEndpointProfile(
stream pb.Destination_GetProfileServer,
) error {
translator := newEndpointProfileTranslator(
s.enableH2Upgrade,
s.controllerNS,
s.identityTrustDomain,
s.defaultOpaquePorts,
s.config.EnableH2Upgrade,
s.config.ControllerNS,
s.config.IdentityTrustDomain,
s.config.DefaultOpaquePorts,
log,
stream,
)

View File

@ -509,16 +509,18 @@ spec:
return &server{
pb.UnimplementedDestinationServer{},
Config{
EnableH2Upgrade: true,
ControllerNS: "linkerd",
ClusterDomain: "mycluster.local",
IdentityTrustDomain: "trust.domain",
DefaultOpaquePorts: defaultOpaquePorts,
},
pods,
endpoints,
opaquePorts,
profiles,
clusterStore,
true,
"linkerd",
"trust.domain",
"mycluster.local",
defaultOpaquePorts,
k8sAPI,
metadataAPI,
log,
@ -576,6 +578,7 @@ func (m *mockDestinationGetProfileServer) Send(profile *pb.DestinationProfile) e
}
func makeEndpointTranslator(t *testing.T) (*mockDestinationGetServer, *endpointTranslator) {
t.Helper()
node := `apiVersion: v1
kind: Node
metadata:
@ -592,12 +595,6 @@ metadata:
topology.kubernetes.io/zone: west-1a
name: test-123
`
k8sAPI, err := k8s.NewFakeAPI(node)
if err != nil {
t.Fatalf("NewFakeAPI returned an error: %s", err)
}
k8sAPI.Sync(nil)
metadataAPI, err := k8s.NewFakeMetadataAPI([]string{node})
if err != nil {
t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
@ -609,10 +606,11 @@ metadata:
"linkerd",
"trust.domain",
true,
true, // enableEndpointFiltering
false, // experimentalEndpointZoneWeights
"service-name.service-ns",
"test-123",
map[uint32]struct{}{},
true,
metadataAPI,
mockGetServer,
nil,

View File

@ -51,6 +51,7 @@ type (
OwnerKind string
Identity string
AuthorityOverride string
Zone *string
ForZones []discovery.ForZone
OpaqueProtocol bool
}
@ -898,6 +899,7 @@ func (pp *portPublisher) endpointSliceToAddresses(es *discovery.EndpointSlice) A
pp.log.Errorf("failed to set address OpaqueProtocol: %s", err)
continue
}
address.Zone = endpoint.Zone
if endpoint.Hints != nil {
zones := make([]discovery.ForZone, len(endpoint.Hints.ForZones))
copy(zones, endpoint.Hints.ForZones)

View File

@ -26,9 +26,11 @@ func Main(args []string) {
addr := cmd.String("addr", ":8086", "address to serve on")
metricsAddr := cmd.String("metrics-addr", ":9996", "address to serve scrapable metrics on")
kubeConfigPath := cmd.String("kubeconfig", "", "path to kube config")
enableH2Upgrade := cmd.Bool("enable-h2-upgrade", true, "Enable transparently upgraded HTTP2 connections among pods in the service mesh")
controllerNamespace := cmd.String("controller-namespace", "linkerd", "namespace in which Linkerd is installed")
enableEndpointSlices := cmd.Bool("enable-endpoint-slices", true, "Enable the usage of EndpointSlice informers and resources")
enableH2Upgrade := cmd.Bool("enable-h2-upgrade", true,
"Enable transparently upgraded HTTP2 connections among pods in the service mesh")
enableEndpointSlices := cmd.Bool("enable-endpoint-slices", true,
"Enable the usage of EndpointSlice informers and resources")
trustDomain := cmd.String("identity-trust-domain", "", "configures the name suffix used for identities")
clusterDomain := cmd.String("cluster-domain", "", "kubernetes cluster domain")
defaultOpaquePorts := cmd.String("default-opaque-ports", "", "configures the default opaque ports")
@ -36,6 +38,12 @@ func Main(args []string) {
traceCollector := flags.AddTraceFlags(cmd)
// Zone weighting is disabled by default because it is not consumed by
// proxies. This feature exists to support experimentation on top of the
// Linkerd control plane API.
experimentalEndpointZoneWeights := cmd.Bool("experimental-endpoint-zone-weights", false,
"Enable setting endpoint weighting based on zone locality")
flags.ConfigureAndParse(cmd, args)
ready := false
@ -127,17 +135,21 @@ func Main(args []string) {
log.Fatalf("Failed to initialize Cluster Store: %s", err)
}
config := destination.Config{
ControllerNS: *controllerNamespace,
IdentityTrustDomain: *trustDomain,
ClusterDomain: *clusterDomain,
DefaultOpaquePorts: opaquePorts,
EnableH2Upgrade: *enableH2Upgrade,
EnableEndpointSlices: *enableEndpointSlices,
ExperimentalEndpointZoneWeights: *experimentalEndpointZoneWeights,
}
server, err := destination.NewServer(
*addr,
*controllerNamespace,
*trustDomain,
*enableH2Upgrade,
*enableEndpointSlices,
config,
k8sAPI,
metadataAPI,
clusterStore,
*clusterDomain,
opaquePorts,
done,
)