Add IPv6 support for the destination controller (#12428)

Services in dual-stack mode result in the creation of two EndpointSlices, one for each IP family. Before this change, the Get Destination API would nondeterministically return the address for any of those ES, depending on which one was processed last by the controller because they would overwrite each other.

As part of the ongoing effort to support IPv6/dual-stack networks, this change fixes that behavior giving preference to IPv6 addresses whenever a service exposes both families.

There are a new set of unit tests in server_ipv6_test.go, and in the TestEndpointTranslatorForPods tests there's a couple of new cases to test the interaction with zone filtering.
Also the server unit tests were updated to segregate the tests and resources dealing with the IPv4/IPv6/dual-stack cases.
This commit is contained in:
Alejandro Pedraza 2024-05-02 14:39:05 -05:00 committed by GitHub
parent 7cbe2f5ca6
commit 137eac9df3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 618 additions and 76 deletions

View File

@ -205,6 +205,7 @@ spec:
- -cluster-domain={{.Values.clusterDomain}}
- -identity-trust-domain={{.Values.identityTrustDomain | default .Values.clusterDomain}}
- -default-opaque-ports={{.Values.proxy.opaquePorts}}
- -enable-ipv6={{not .Values.disableIPv6}}
- -enable-pprof={{.Values.enablePprof | default false}}
{{- if (.Values.destinationController).meshedHttp2ClientProtobuf }}
- --meshed-http2-client-params={{ toJson .Values.destinationController.meshedHttp2ClientProtobuf }}

View File

@ -1476,6 +1476,7 @@ spec:
- -cluster-domain=cluster.local
- -identity-trust-domain=cluster.local
- -default-opaque-ports=25,587,3306,4444,5432,6379,9300,11211
- -enable-ipv6=true
- -enable-pprof=false
- --meshed-http2-client-params={"keep_alive":{"interval":{"seconds":10},"timeout":{"seconds":3},"while_idle":true}}
- -trace-collector=collector.linkerd-jaeger.svc.cluster.local:55678

View File

@ -1475,6 +1475,7 @@ spec:
- -cluster-domain=cluster.local
- -identity-trust-domain=cluster.local
- -default-opaque-ports=25,587,3306,4444,5432,6379,9300,11211
- -enable-ipv6=true
- -enable-pprof=false
- --meshed-http2-client-params={"keep_alive":{"interval":{"seconds":10},"timeout":{"seconds":3},"while_idle":true}}
image: cr.l5d.io/linkerd/controller:install-control-plane-version

View File

@ -1475,6 +1475,7 @@ spec:
- -cluster-domain=cluster.local
- -identity-trust-domain=cluster.local
- -default-opaque-ports=25,587,3306,4444,5432,6379,9300,11211
- -enable-ipv6=true
- -enable-pprof=false
- --meshed-http2-client-params={"keep_alive":{"interval":{"seconds":10},"timeout":{"seconds":3},"while_idle":true}}
image: my.custom.registry/linkerd-io/controller:install-control-plane-version

View File

@ -1475,6 +1475,7 @@ spec:
- -cluster-domain=cluster.local
- -identity-trust-domain=cluster.local
- -default-opaque-ports=25,587,3306,4444,5432,6379,9300,11211
- -enable-ipv6=true
- -enable-pprof=false
- --meshed-http2-client-params={"keep_alive":{"interval":{"seconds":10},"timeout":{"seconds":3},"while_idle":true}}
image: cr.l5d.io/linkerd/controller:install-control-plane-version

View File

@ -1475,6 +1475,7 @@ spec:
- -cluster-domain=cluster.local
- -identity-trust-domain=cluster.local
- -default-opaque-ports=25,587,3306,4444,5432,6379,9300,11211
- -enable-ipv6=true
- -enable-pprof=false
- --meshed-http2-client-params={"keep_alive":{"interval":{"seconds":10},"timeout":{"seconds":3},"while_idle":true}}
image: cr.l5d.io/linkerd/controller:install-control-plane-version

View File

@ -1464,6 +1464,7 @@ spec:
- -cluster-domain=cluster.local
- -identity-trust-domain=cluster.local
- -default-opaque-ports=25,587,3306,4444,5432,6379,9300,11211
- -enable-ipv6=true
- -enable-pprof=false
- --meshed-http2-client-params={"keep_alive":{"interval":{"seconds":10},"timeout":{"seconds":3},"while_idle":true}}
image: cr.l5d.io/linkerd/controller:install-control-plane-version

View File

@ -1597,6 +1597,7 @@ spec:
- -cluster-domain=cluster.local
- -identity-trust-domain=cluster.local
- -default-opaque-ports=25,587,3306,4444,5432,6379,9300,11211
- -enable-ipv6=true
- -enable-pprof=false
- --meshed-http2-client-params={"keep_alive":{"interval":{"seconds":10},"timeout":{"seconds":3},"while_idle":true}}
image: cr.l5d.io/linkerd/controller:install-control-plane-version

View File

@ -1597,6 +1597,7 @@ spec:
- -cluster-domain=cluster.local
- -identity-trust-domain=cluster.local
- -default-opaque-ports=25,587,3306,4444,5432,6379,9300,11211
- -enable-ipv6=true
- -enable-pprof=false
- --meshed-http2-client-params={"keep_alive":{"interval":{"seconds":10},"timeout":{"seconds":3},"while_idle":true}}
image: cr.l5d.io/linkerd/controller:install-control-plane-version

View File

@ -1406,6 +1406,7 @@ spec:
- -cluster-domain=cluster.local
- -identity-trust-domain=cluster.local
- -default-opaque-ports=25,587,3306,4444,5432,6379,9300,11211
- -enable-ipv6=true
- -enable-pprof=false
- --meshed-http2-client-params={"keep_alive":{"interval":{"seconds":10},"timeout":{"seconds":3},"while_idle":true}}
image: cr.l5d.io/linkerd/controller:install-control-plane-version

View File

@ -1450,6 +1450,7 @@ spec:
- -cluster-domain=cluster.local
- -identity-trust-domain=test.trust.domain
- -default-opaque-ports=25,587,3306,4444,5432,6379,9300,11211
- -enable-ipv6=true
- -enable-pprof=false
- --meshed-http2-client-params={"keep_alive":{"interval":{"seconds":10},"timeout":{"seconds":3},"while_idle":true}}
image: cr.l5d.io/linkerd/controller:linkerd-version

View File

@ -1572,6 +1572,7 @@ spec:
- -cluster-domain=cluster.local
- -identity-trust-domain=test.trust.domain
- -default-opaque-ports=25,587,3306,4444,5432,6379,9300,11211
- -enable-ipv6=true
- -enable-pprof=false
- --meshed-http2-client-params={"keep_alive":{"interval":{"seconds":10},"timeout":{"seconds":3},"while_idle":true}}
image: cr.l5d.io/linkerd/controller:linkerd-version

View File

@ -1584,6 +1584,7 @@ spec:
- -cluster-domain=cluster.local
- -identity-trust-domain=test.trust.domain
- -default-opaque-ports=25,587,3306,4444,5432,6379,9300,11211
- -enable-ipv6=true
- -enable-pprof=false
- --meshed-http2-client-params={"keep_alive":{"interval":{"seconds":10},"timeout":{"seconds":3},"while_idle":true}}
image: cr.l5d.io/linkerd/controller:linkerd-version

View File

@ -1562,6 +1562,7 @@ spec:
- -cluster-domain=cluster.local
- -identity-trust-domain=test.trust.domain
- -default-opaque-ports=25,587,3306,4444,5432,6379,9300,11211
- -enable-ipv6=true
- -enable-pprof=false
- --meshed-http2-client-params={"keep_alive":{"interval":{"seconds":10},"timeout":{"seconds":3},"while_idle":true}}
image: cr.l5d.io/linkerd/controller:linkerd-version

View File

@ -1469,6 +1469,7 @@ spec:
- -cluster-domain=cluster.local
- -identity-trust-domain=cluster.local
- -default-opaque-ports=25,587,3306,4444,5432,6379,9300,11211
- -enable-ipv6=true
- -enable-pprof=false
- --meshed-http2-client-params={"keep_alive":{"interval":{"seconds":10},"timeout":{"seconds":3},"while_idle":true}}
image: cr.l5d.io/linkerd/controller:install-control-plane-version

View File

@ -1407,6 +1407,7 @@ spec:
- -cluster-domain=cluster.local
- -identity-trust-domain=cluster.local
- -default-opaque-ports=25,443,587,3306,5432,11211
- -enable-ipv6=true
- -enable-pprof=false
image: ControllerImage:LinkerdVersion
imagePullPolicy: ImagePullPolicy

View File

@ -1475,6 +1475,7 @@ spec:
- -cluster-domain=cluster.local
- -identity-trust-domain=cluster.local
- -default-opaque-ports=25,587,3306,4444,5432,6379,9300,11211
- -enable-ipv6=true
- -enable-pprof=false
- --meshed-http2-client-params={"keep_alive":{"interval":{"seconds":10},"timeout":{"seconds":3},"while_idle":true}}
image: cr.l5d.io/linkerd/controller:install-control-plane-version

View File

@ -1475,6 +1475,7 @@ spec:
- -cluster-domain=example.com
- -identity-trust-domain=example.com
- -default-opaque-ports=25,587,3306,4444,5432,6379,9300,11211
- -enable-ipv6=true
- -enable-pprof=false
- --meshed-http2-client-params={"keep_alive":{"interval":{"seconds":10},"timeout":{"seconds":3},"while_idle":true}}
image: cr.l5d.io/linkerd/controller:install-control-plane-version

View File

@ -40,6 +40,8 @@ type (
enableH2Upgrade,
enableEndpointFiltering,
enableIPv6,
extEndpointZoneWeights bool
meshedHTTP2ClientParams *pb.Http2ClientParams
@ -83,6 +85,7 @@ func newEndpointTranslator(
identityTrustDomain string,
enableH2Upgrade,
enableEndpointFiltering,
enableIPv6,
extEndpointZoneWeights bool,
meshedHTTP2ClientParams *pb.Http2ClientParams,
service string,
@ -114,6 +117,7 @@ func newEndpointTranslator(
defaultOpaquePorts,
enableH2Upgrade,
enableEndpointFiltering,
enableIPv6,
extEndpointZoneWeights,
meshedHTTP2ClientParams,
@ -239,6 +243,7 @@ func (et *endpointTranslator) noEndpoints(exists bool) {
func (et *endpointTranslator) sendFilteredUpdate() {
filtered := et.filterAddresses()
filtered = et.selectAddressFamily(filtered)
diffAdd, diffRemove := et.diffEndpoints(filtered)
if len(diffAdd.Addresses) > 0 {
@ -251,6 +256,33 @@ func (et *endpointTranslator) sendFilteredUpdate() {
et.filteredSnapshot = filtered
}
func (et *endpointTranslator) selectAddressFamily(addresses watcher.AddressSet) watcher.AddressSet {
filtered := make(map[watcher.ID]watcher.Address)
for id, addr := range addresses.Addresses {
if id.IPFamily == corev1.IPv6Protocol && !et.enableIPv6 {
continue
}
if id.IPFamily == corev1.IPv4Protocol && et.enableIPv6 {
// Only consider IPv4 address for which there's not already an IPv6
// alternative
altID := id
altID.IPFamily = corev1.IPv6Protocol
if _, ok := addresses.Addresses[altID]; ok {
continue
}
}
filtered[id] = addr
}
return watcher.AddressSet{
Addresses: filtered,
Labels: addresses.Labels,
LocalTrafficPolicy: addresses.LocalTrafficPolicy,
}
}
// filterAddresses is responsible for filtering endpoints based on the node's
// topology zone. The client will only receive endpoints with the same
// consumption zone as the node. An endpoints consumption zone is set

View File

@ -2,6 +2,7 @@ package destination
import (
"fmt"
"net/netip"
"sort"
"strings"
"sync"
@ -41,6 +42,26 @@ var (
OwnerName: "rc-name",
}
pod1IPv6 = watcher.Address{
IP: "2001:0db8:85a3:0000:0000:8a2e:0370:7333",
Port: 1,
Pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
Namespace: "ns",
Labels: map[string]string{
k8s.ControllerNSLabel: "linkerd",
k8s.ProxyDeploymentLabel: "deployment-name",
},
},
Spec: corev1.PodSpec{
ServiceAccountName: "serviceaccount-name",
},
},
OwnerKind: "replicationcontroller",
OwnerName: "rc-name",
}
pod2 = watcher.Address{
IP: "1.1.1.2",
Port: 2,
@ -400,8 +421,8 @@ func TestEndpointTranslatorForPods(t *testing.T) {
translator.Start()
defer translator.Stop()
translator.Add(mkAddressSetForPods(pod1, pod2))
translator.Remove(mkAddressSetForPods(pod2))
translator.Add(mkAddressSetForPods(t, pod1, pod2))
translator.Remove(mkAddressSetForPods(t, pod2))
expectedNumUpdates := 2
<-mockGetServer.updatesReceived // Add
@ -417,8 +438,8 @@ func TestEndpointTranslatorForPods(t *testing.T) {
translator.Start()
defer translator.Stop()
translator.Add(mkAddressSetForPods(pod1, pod2, pod3))
translator.Remove(mkAddressSetForPods(pod3))
translator.Add(mkAddressSetForPods(t, pod1, pod2, pod3))
translator.Remove(mkAddressSetForPods(t, pod3))
addressesAdded := (<-mockGetServer.updatesReceived).GetAdd().Addrs
actualNumberOfAdded := len(addressesAdded)
@ -447,7 +468,7 @@ func TestEndpointTranslatorForPods(t *testing.T) {
translator.Start()
defer translator.Stop()
translator.Add(mkAddressSetForPods(pod1))
translator.Add(mkAddressSetForPods(t, pod1))
update := <-mockGetServer.updatesReceived
@ -479,7 +500,7 @@ func TestEndpointTranslatorForPods(t *testing.T) {
translator.Start()
defer translator.Stop()
translator.Add(mkAddressSetForPods(pod1))
translator.Add(mkAddressSetForPods(t, pod1))
addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
if len(addrs) != 1 {
@ -518,6 +539,56 @@ func TestEndpointTranslatorForPods(t *testing.T) {
t.Fatalf("ProtocolHint: %v", diff)
}
})
t.Run("Sends IPv6 only when pod has both IPv4 and IPv6", func(t *testing.T) {
mockGetServer, translator := makeEndpointTranslator(t)
translator.Start()
defer translator.Stop()
translator.Add(mkAddressSetForPods(t, pod1, pod1IPv6))
addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
if len(addrs) != 1 {
t.Fatalf("Expected [1] address returned, got %v", addrs)
}
if ipPort := addr.ProxyAddressToString(addrs[0].GetAddr()); ipPort != "[2001:db8:85a3::8a2e:370:7333]:1" {
t.Fatalf("Expected address to be [%s], got [%s]", "[2001:db8:85a3::8a2e:370:7333]:1", ipPort)
}
if updates := len(mockGetServer.updatesReceived); updates > 0 {
t.Fatalf("Expected to receive no more messages, received [%d]", updates)
}
})
t.Run("Sends IPv4 only when pod has both IPv4 and IPv6 but the latter in another zone ", func(t *testing.T) {
mockGetServer, translator := makeEndpointTranslator(t)
translator.Start()
defer translator.Stop()
pod1West1a := pod1
pod1West1a.ForZones = []v1.ForZone{
{Name: "west-1a"},
}
pod1IPv6West1b := pod1IPv6
pod1IPv6West1b.ForZones = []v1.ForZone{
{Name: "west-1b"},
}
translator.Add(mkAddressSetForPods(t, pod1West1a, pod1IPv6West1b))
addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
if len(addrs) != 1 {
t.Fatalf("Expected [1] address returned, got %v", addrs)
}
if ipPort := addr.ProxyAddressToString(addrs[0].GetAddr()); ipPort != "1.1.1.1:1" {
t.Fatalf("Expected address to be [%s], got [%s]", "1.1.1.1:1", ipPort)
}
if updates := len(mockGetServer.updatesReceived); updates > 0 {
t.Fatalf("Expected to receive no more messages, received [%d]", updates)
}
})
}
func TestEndpointTranslatorExternalWorkloads(t *testing.T) {
@ -842,13 +913,30 @@ func mkAddressSetForServices(gatewayAddresses ...watcher.Address) watcher.Addres
return set
}
func mkAddressSetForPods(podAddresses ...watcher.Address) watcher.AddressSet {
func mkAddressSetForPods(t *testing.T, podAddresses ...watcher.Address) watcher.AddressSet {
t.Helper()
set := watcher.AddressSet{
Addresses: make(map[watcher.PodID]watcher.Address),
Labels: map[string]string{"service": "service-name", "namespace": "service-ns"},
}
for _, p := range podAddresses {
id := watcher.PodID{Name: p.Pod.Name, Namespace: p.Pod.Namespace}
// The IP family is set on the PodID used to index the
// watcher.Address; here we simply detect it
fam := corev1.IPv4Protocol
addr, err := netip.ParseAddr(p.IP)
if err != nil {
t.Fatalf("Invalid IP '%s': %s", p.IP, err)
}
if addr.Is6() {
fam = corev1.IPv6Protocol
}
id := watcher.PodID{
Name: p.Pod.Name,
Namespace: p.Pod.Namespace,
IPFamily: fam,
}
set.Addresses[id] = p
}
return set

View File

@ -32,6 +32,7 @@ type (
EnableH2Upgrade,
EnableEndpointSlices,
EnableIPv6,
ExtEndpointZoneWeights bool
MeshedHttp2ClientParams *pb.Http2ClientParams
@ -188,6 +189,7 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e
remoteConfig.TrustDomain,
s.config.EnableH2Upgrade,
false, // Disable endpoint filtering for remote discovery.
s.config.EnableIPv6,
s.config.ExtEndpointZoneWeights,
s.config.MeshedHttp2ClientParams,
fmt.Sprintf("%s.%s.svc.%s:%d", remoteSvc, service.Namespace, remoteConfig.ClusterDomain, port),
@ -220,6 +222,7 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e
s.config.IdentityTrustDomain,
s.config.EnableH2Upgrade,
true,
s.config.EnableIPv6,
s.config.ExtEndpointZoneWeights,
s.config.MeshedHttp2ClientParams,
dest.GetPath(),

View File

@ -0,0 +1,179 @@
package destination
import (
"context"
"testing"
"time"
pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
"github.com/linkerd/linkerd2/controller/api/util"
corev1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)
func TestIPv6(t *testing.T) {
port := int32(port)
protocol := corev1.ProtocolTCP
server := makeServer(t)
defer server.clusterStore.UnregisterGauges()
stream := &bufferingGetStream{
updates: make(chan *pb.Update, 50),
MockServerStream: util.NewMockServerStream(),
}
defer stream.Cancel()
t.Run("Return only IPv6 endpoint for dual-stack service", func(t *testing.T) {
testReturnEndpointsForServer(t, server, stream, fullyQualifiedNameDual, podIPv6Dual, uint32(port))
})
t.Run("Returns only IPv4 endpoint when service becomes single-stack IPv4", func(t *testing.T) {
patch := []byte(`{"spec":{"clusterIPs": ["172.17.13.0"], "ipFamilies":["IPv4"]}}`)
_, err := server.k8sAPI.Client.CoreV1().Services("ns").Patch(context.Background(), "name-ds", types.MergePatchType, patch, metav1.PatchOptions{})
if err != nil {
t.Fatalf("Failed patching name-ds service: %s", err)
}
if err = server.k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Delete(context.Background(), "name-ds-ipv6", metav1.DeleteOptions{}); err != nil {
t.Fatalf("Failed deleting name-ds-ipv6 ES: %s", err)
}
update := <-stream.updates
if updateAddAddress(t, update)[0] != "172.17.0.19:8989" {
t.Fatalf("Expected %s but got %s", "172.17.0.19:8989", updateAddAddress(t, update)[0])
}
update = <-stream.updates
if updateRemoveAddress(t, update)[0] != "[2001:db8::94]:8989" {
t.Fatalf("Expected %s but got %s", "[2001:db8::94]:8989", updateRemoveAddress(t, update)[0])
}
})
t.Run("Returns only IPv6 endpoint when service becomes dual-stack again", func(t *testing.T) {
// We patch the service to become dual-stack again and we add the IPv6
// ES. We should receive the events for the removal of the IPv4 ES and
// the addition of the IPv6 one.
patch := []byte(`{"spec":{"clusterIPs": ["172.17.13.0","2001:db8::88"], "ipFamilies":["IPv4","IPv6"]}}`)
_, err := server.k8sAPI.Client.CoreV1().Services("ns").Patch(context.Background(), "name-ds", types.MergePatchType, patch, metav1.PatchOptions{})
if err != nil {
t.Fatalf("Failed patching name-ds service: %s", err)
}
es := &discovery.EndpointSlice{
TypeMeta: metav1.TypeMeta{
Kind: "EndpointSlice",
APIVersion: "discovery.k8s.io/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "name-ds-ipv6",
Namespace: "ns",
Labels: map[string]string{
"kubernetes.io/service-name": "name-ds",
},
},
AddressType: discovery.AddressTypeIPv6,
Ports: []discovery.EndpointPort{
{
Port: &port,
Protocol: &protocol,
},
},
Endpoints: []discovery.Endpoint{
{
Addresses: []string{"2001:db8::94"},
TargetRef: &corev1.ObjectReference{
Kind: "Pod",
Namespace: "ns",
Name: "name-ds",
},
},
},
}
if _, err := server.k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Create(context.Background(), es, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed creating name-ds-ipv6 ES: %s", err)
}
update := <-stream.updates
if updateAddAddress(t, update)[0] != "[2001:db8::94]:8989" {
t.Fatalf("Expected %s but got %s", "[2001:db8::94]:8989", updateAddAddress(t, update)[0])
}
update = <-stream.updates
if updateRemoveAddress(t, update)[0] != "172.17.0.19:8989" {
t.Fatalf("Expected %s but got %s", "172.17.0.19:8989", updateRemoveAddress(t, update)[0])
}
})
t.Run("Doesn't return anything when adding an IPv4 to the dual-stack service", func(t *testing.T) {
es := &discovery.EndpointSlice{
TypeMeta: metav1.TypeMeta{
Kind: "EndpointSlice",
APIVersion: "discovery.k8s.io/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "name-ds-ipv4-2",
Namespace: "ns",
Labels: map[string]string{
"kubernetes.io/service-name": "name-ds",
},
},
AddressType: discovery.AddressTypeIPv4,
Ports: []discovery.EndpointPort{
{
Port: &port,
Protocol: &protocol,
},
},
Endpoints: []discovery.Endpoint{
{
Addresses: []string{"172.17.0.20"},
TargetRef: &corev1.ObjectReference{
Kind: "Pod",
Namespace: "ns",
Name: "name-ds",
},
},
},
}
if _, err := server.k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Create(context.Background(), es, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed creating name-ds-ipv4-2 ES: %s", err)
}
time.Sleep(50 * time.Millisecond)
if len(stream.updates) != 0 {
t.Fatalf("Expected no events but got %#v", stream.updates)
}
})
t.Run("Doesn't return anything when removing an IPv4 ES from the dual-stack service", func(t *testing.T) {
if err := server.k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Delete(context.Background(), "name-ds-ipv4-2", metav1.DeleteOptions{}); err != nil {
t.Fatalf("Failed deleting name-ds-ipv4-2 ES: %s", err)
}
time.Sleep(50 * time.Millisecond)
if len(stream.updates) != 0 {
t.Fatalf("Expected no events but got %#v", stream.updates)
}
})
t.Run("Doesn't return anything when the service becomes single-stack IPv6", func(t *testing.T) {
patch := []byte(`{"spec":{"clusterIPs": ["2001:db8::88"], "ipFamilies":["IPv6"]}}`)
_, err := server.k8sAPI.Client.CoreV1().Services("ns").Patch(context.Background(), "name-ds", types.MergePatchType, patch, metav1.PatchOptions{})
if err != nil {
t.Fatalf("Failed patching name-ds service: %s", err)
}
if err := server.k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Delete(context.Background(), "name-ds-ipv4", metav1.DeleteOptions{}); err != nil {
t.Fatalf("Failed deleting name-ds-ipv4 ES: %s", err)
}
time.Sleep(50 * time.Millisecond)
if len(stream.updates) != 0 {
t.Fatalf("Expected no events but got %#v", stream.updates)
}
})
}

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
gonet "net"
"net/netip"
"reflect"
"testing"
"time"
@ -27,6 +28,8 @@ import (
)
const fullyQualifiedName = "name1.ns.svc.mycluster.local"
const fullyQualifiedNameIPv6 = "name-ipv6.ns.svc.mycluster.local"
const fullyQualifiedNameDual = "name-ds.ns.svc.mycluster.local"
const fullyQualifiedNameOpaque = "name3.ns.svc.mycluster.local"
const fullyQualifiedNameOpaqueService = "name4.ns.svc.mycluster.local"
const fullyQualifiedNameSkipped = "name5.ns.svc.mycluster.local"
@ -36,6 +39,7 @@ const clusterIPv6 = "2001:db8::88"
const clusterIPOpaque = "172.17.12.1"
const podIP1 = "172.17.0.12"
const podIP1v6 = "2001:db8::68"
const podIPv6Dual = "2001:db8::94"
const podIP2 = "172.17.0.13"
const podIPOpaque = "172.17.0.14"
const podIPSkipped = "172.17.0.15"
@ -82,38 +86,16 @@ func TestGet(t *testing.T) {
}
})
t.Run("Returns endpoints", func(t *testing.T) {
server := makeServer(t)
defer server.clusterStore.UnregisterGauges()
t.Run("Returns endpoints (IPv4)", func(t *testing.T) {
testReturnEndpoints(t, fullyQualifiedName, podIP1, port)
})
stream := &bufferingGetStream{
updates: make(chan *pb.Update, 50),
MockServerStream: util.NewMockServerStream(),
}
defer stream.Cancel()
errs := make(chan error)
t.Run("Returns endpoints (IPv6)", func(t *testing.T) {
testReturnEndpoints(t, fullyQualifiedNameIPv6, podIP1v6, port)
})
// server.Get blocks until the grpc stream is complete so we call it
// in a goroutine and watch stream.updates for updates.
go func() {
err := server.Get(&pb.GetDestination{Scheme: "k8s", Path: fmt.Sprintf("%s:%d", fullyQualifiedName, port)}, stream)
if err != nil {
errs <- err
}
}()
select {
case update := <-stream.updates:
if updateAddAddress(t, update)[0] != fmt.Sprintf("%s:%d", podIP1, port) {
t.Fatalf("Expected %s but got %s", fmt.Sprintf("%s:%d", podIP1, port), updateAddAddress(t, update)[0])
}
if len(stream.updates) != 0 {
t.Fatalf("Expected 1 update but got %d: %v", 1+len(stream.updates), stream.updates)
}
case err := <-errs:
t.Fatalf("Got error: %s", err)
}
t.Run("Returns endpoints (dual-stack)", func(t *testing.T) {
testReturnEndpoints(t, fullyQualifiedNameDual, podIPv6Dual, port)
})
t.Run("Sets meshed HTTP/2 client params", func(t *testing.T) {
@ -494,7 +476,7 @@ func TestGetProfiles(t *testing.T) {
stream := profileStream(t, server, clusterIPv6, port, "")
defer stream.Cancel()
profile := assertSingleProfile(t, stream.Updates())
if profile.FullyQualifiedName != fullyQualifiedName {
if profile.FullyQualifiedName != fullyQualifiedNameDual {
t.Fatalf("Expected fully qualified name '%s', but got '%s'", fullyQualifiedName, profile.FullyQualifiedName)
}
if profile.OpaqueProtocol {
@ -608,10 +590,10 @@ func TestGetProfiles(t *testing.T) {
server := makeServer(t)
defer server.clusterStore.UnregisterGauges()
stream := profileStream(t, server, podIP1v6, port, "ns:ns")
stream := profileStream(t, server, podIPv6Dual, port, "ns:ns")
defer stream.Cancel()
epAddr, err := toAddress(podIP1v6, port)
epAddr, err := toAddress(podIPv6Dual, port)
if err != nil {
t.Fatalf("Got error: %s", err)
}
@ -1181,6 +1163,19 @@ func updateAddAddress(t *testing.T, update *pb.Update) []string {
return ips
}
func updateRemoveAddress(t *testing.T, update *pb.Update) []string {
t.Helper()
add, ok := update.GetUpdate().(*pb.Update_Remove)
if !ok {
t.Fatalf("Update expected to be a remove, but was %+v", update)
}
ips := []string{}
for _, ip := range add.Remove.Addrs {
ips = append(ips, addr.ProxyAddressToString(ip))
}
return ips
}
func toAddress(path string, port uint32) (*net.TcpAddress, error) {
ip, err := addr.ParseProxyIP(path)
if err != nil {
@ -1196,7 +1191,6 @@ func TestIpWatcherGetSvcID(t *testing.T) {
name := "service"
namespace := "test"
clusterIP := "10.245.0.1"
clusterIPv6 := "2001:db8::68"
k8sConfigs := `
apiVersion: v1
kind: Service
@ -1208,7 +1202,7 @@ spec:
clusterIP: 10.245.0.1
clusterIPs:
- 10.245.0.1
- 2001:db8::68
- 2001:db8::88
ports:
- port: 1234`
@ -1264,6 +1258,57 @@ spec:
})
}
func testReturnEndpoints(t *testing.T, fqdn, ip string, port uint32) {
t.Helper()
server := makeServer(t)
defer server.clusterStore.UnregisterGauges()
stream := &bufferingGetStream{
updates: make(chan *pb.Update, 50),
MockServerStream: util.NewMockServerStream(),
}
defer stream.Cancel()
testReturnEndpointsForServer(t, server, stream, fqdn, ip, port)
}
func testReturnEndpointsForServer(t *testing.T, server *server, stream *bufferingGetStream, fqdn, ip string, port uint32) {
t.Helper()
errs := make(chan error)
// server.Get blocks until the grpc stream is complete so we call it
// in a goroutine and watch stream.updates for updates.
go func() {
err := server.Get(&pb.GetDestination{Scheme: "k8s", Path: fmt.Sprintf("%s:%d", fqdn, port)}, stream)
if err != nil {
errs <- err
}
}()
addr := fmt.Sprintf("%s:%d", ip, port)
parsedIP, err := netip.ParseAddr(ip)
if err != nil {
t.Fatalf("Invalid IP [%s]: %s", ip, err)
}
if parsedIP.Is6() {
addr = fmt.Sprintf("[%s]:%d", ip, port)
}
select {
case update := <-stream.updates:
if updateAddAddress(t, update)[0] != addr {
t.Fatalf("Expected %s but got %s", addr, updateAddAddress(t, update)[0])
}
if len(stream.updates) != 0 {
t.Fatalf("Expected 1 update but got %d: %v", 1+len(stream.updates), stream.updates)
}
case err := <-errs:
t.Fatalf("Got error: %s", err)
}
}
func assertSingleProfile(t *testing.T, updates []*pb.DestinationProfile) *pb.DestinationProfile {
t.Helper()
// Under normal conditions the creation of resources by the fake API will

View File

@ -32,10 +32,11 @@ metadata:
namespace: ns
spec:
type: LoadBalancer
ipFamilies:
- IPv4
clusterIP: 172.17.12.0
clusterIPs:
- 172.17.12.0
- 2001:db8::88
ports:
- port: 8989`,
`
@ -55,25 +56,6 @@ endpoints:
name: name1-1
namespace: ns
ports:
- port: 8989
protocol: TCP`,
`
apiVersion: discovery.k8s.io/v1
kind: EndpointSlice
metadata:
name: name1-ipv6
namespace: ns
labels:
kubernetes.io/service-name: name1
addressType: IPv6
endpoints:
- addresses:
- 2001:db8::90
targetRef:
kind: Pod
name: name1-1
namespace: ns
ports:
- port: 8989
protocol: TCP`,
`
@ -92,7 +74,6 @@ status:
podIP: 172.17.0.12
podIPs:
- ip: 172.17.0.12
- ip: 2001:db8::68
spec:
containers:
- env:
@ -752,7 +733,8 @@ ports:
- port: 80
protocol: TCP`,
}
extenalNameResources := []string{
externalNameResources := []string{
`
apiVersion: v1
kind: Service
@ -764,6 +746,158 @@ spec:
externalName: linkerd.io`,
}
ipv6 := []string{
`
apiVersion: v1
kind: Service
metadata:
name: name-ipv6
namespace: ns
spec:
type: ClusterIP
ipFamilies:
- IPv6
clusterIP: 2001:db8::93
clusterIPs:
- 2001:db8::93
ports:
- port: 8989`,
`
apiVersion: discovery.k8s.io/v1
kind: EndpointSlice
metadata:
name: name-ipv6
namespace: ns
labels:
kubernetes.io/service-name: name-ipv6
addressType: IPv6
endpoints:
- addresses:
- 2001:db8::68
targetRef:
kind: Pod
name: name-ipv6
namespace: ns
ports:
- port: 8989
protocol: TCP`,
`
apiVersion: v1
kind: Pod
metadata:
labels:
linkerd.io/control-plane-ns: linkerd
name: name-ipv6
namespace: ns
status:
phase: Running
conditions:
- type: Ready
status: "True"
podIP: 2001:db8::68
podIPs:
- ip: 2001:db8::68
spec:
containers:
- env:
- name: LINKERD2_PROXY_INBOUND_LISTEN_ADDR
value: 0.0.0.0:4143
name: linkerd-proxy`,
}
dualStack := []string{
`
apiVersion: v1
kind: Service
metadata:
name: name-ds
namespace: ns
spec:
type: ClusterIP
ipFamilies:
- IPv4
- IPv6
clusterIP: 172.17.13.0
clusterIPs:
- 172.17.13.0
- 2001:db8::88
ports:
- port: 8989`,
`
apiVersion: discovery.k8s.io/v1
kind: EndpointSlice
metadata:
name: name-ds-ipv4
namespace: ns
labels:
kubernetes.io/service-name: name-ds
addressType: IPv4
endpoints:
- addresses:
- 172.17.0.19
targetRef:
kind: Pod
name: name-ds
namespace: ns
ports:
- port: 8989
protocol: TCP`,
`
apiVersion: discovery.k8s.io/v1
kind: EndpointSlice
metadata:
name: name-ds-ipv6
namespace: ns
labels:
kubernetes.io/service-name: name-ds
addressType: IPv6
endpoints:
- addresses:
- 2001:db8::94
targetRef:
kind: Pod
name: name-ds
namespace: ns
ports:
- port: 8989
protocol: TCP`,
`
apiVersion: v1
kind: Pod
metadata:
labels:
linkerd.io/control-plane-ns: linkerd
name: name-ds
namespace: ns
status:
phase: Running
conditions:
- type: Ready
status: "True"
podIP: 172.17.0.19
podIPs:
- ip: 172.17.0.19
- ip: 2001:db8::94
spec:
containers:
- env:
- name: LINKERD2_PROXY_INBOUND_LISTEN_ADDR
value: 0.0.0.0:4143
name: linkerd-proxy`,
`
apiVersion: linkerd.io/v1alpha2
kind: ServiceProfile
metadata:
name: name-ds.ns.svc.mycluster.local
namespace: ns
spec:
routes:
- name: route1
isRetryable: false
condition:
pathRegex: "/a/b/c"`,
}
res := append(meshedPodResources, clientSP...)
res = append(res, unmeshedPod)
res = append(res, meshedOpaquePodResources...)
@ -776,7 +910,9 @@ spec:
res = append(res, mirrorServiceResources...)
res = append(res, destinationCredentialsResources...)
res = append(res, externalWorkloads...)
res = append(res, extenalNameResources...)
res = append(res, externalNameResources...)
res = append(res, ipv6...)
res = append(res, dualStack...)
k8sAPI, l5dClient, err := k8s.NewFakeAPIWithL5dClient(res...)
if err != nil {
t.Fatalf("NewFakeAPIWithL5dClient returned an error: %s", err)
@ -833,6 +969,7 @@ spec:
pb.UnimplementedDestinationServer{},
Config{
EnableH2Upgrade: true,
EnableIPv6: true,
ControllerNS: "linkerd",
ClusterDomain: "mycluster.local",
IdentityTrustDomain: "trust.domain",
@ -928,6 +1065,7 @@ metadata:
"linkerd",
"trust.domain",
true,
true,
true, // enableEndpointFiltering
false, // extEndpointZoneWeights
nil, // meshedHttp2ClientParams

View File

@ -60,6 +60,12 @@ type (
}
// AddressSet is a set of Address, indexed by ID.
// The ID can be either:
// 1) A reference to service: id.Name contains both the service name and
// the target IP and port (see newServiceRefAddress)
// 2) A reference to a pod: id.Name refers to the pod's name, and
// id.IPFamily refers to the ES AddressType (see newPodRefAddress).
// 3) A reference to an ExternalWorkload: id.Name refers to the EW's name.
AddressSet struct {
Addresses map[ID]Address
Labels map[string]string
@ -365,7 +371,7 @@ func (ew *EndpointsWatcher) addEndpoints(obj interface{}) {
return
}
id := ServiceID{endpoints.Namespace, endpoints.Name}
id := ServiceID{Namespace: endpoints.Namespace, Name: endpoints.Name}
sp := ew.getOrNewServicePublisher(id)
sp.updateEndpoints(endpoints)
}
@ -389,7 +395,7 @@ func (ew *EndpointsWatcher) updateEndpoints(oldObj interface{}, newObj interface
endpointsInformerLag.Observe(delta.Seconds())
}
id := ServiceID{newEndpoints.Namespace, newEndpoints.Name}
id := ServiceID{Namespace: newEndpoints.Namespace, Name: newEndpoints.Name}
sp := ew.getOrNewServicePublisher(id)
sp.updateEndpoints(newEndpoints)
}
@ -601,7 +607,8 @@ func (sp *servicePublisher) deleteEndpoints() {
func (sp *servicePublisher) addEndpointSlice(newSlice *discovery.EndpointSlice) {
sp.Lock()
defer sp.Unlock()
sp.log.Debugf("Adding EndpointSlice for %s", sp.id)
sp.log.Debugf("Adding ES %s/%s", newSlice.Namespace, newSlice.Name)
for _, port := range sp.ports {
port.addEndpointSlice(newSlice)
}
@ -610,7 +617,8 @@ func (sp *servicePublisher) addEndpointSlice(newSlice *discovery.EndpointSlice)
func (sp *servicePublisher) updateEndpointSlice(oldSlice *discovery.EndpointSlice, newSlice *discovery.EndpointSlice) {
sp.Lock()
defer sp.Unlock()
sp.log.Debugf("Updating EndpointSlice for %s", sp.id)
sp.log.Debugf("Updating ES %s/%s", oldSlice.Namespace, oldSlice.Name)
for _, port := range sp.ports {
port.updateEndpointSlice(oldSlice, newSlice)
}
@ -619,7 +627,8 @@ func (sp *servicePublisher) updateEndpointSlice(oldSlice *discovery.EndpointSlic
func (sp *servicePublisher) deleteEndpointSlice(es *discovery.EndpointSlice) {
sp.Lock()
defer sp.Unlock()
sp.log.Debugf("Deleting EndpointSlice for %s", sp.id)
sp.log.Debugf("Deleting ES %s/%s", es.Namespace, es.Name)
for _, port := range sp.ports {
port.deleteEndpointSlice(es)
}
@ -926,7 +935,13 @@ func (pp *portPublisher) endpointSliceToAddresses(es *discovery.EndpointSlice) A
if endpoint.TargetRef.Kind == endpointTargetRefPod {
for _, IPAddr := range endpoint.Addresses {
address, id, err := pp.newPodRefAddress(resolvedPort, IPAddr, endpoint.TargetRef.Name, endpoint.TargetRef.Namespace)
address, id, err := pp.newPodRefAddress(
resolvedPort,
es.AddressType,
IPAddr,
endpoint.TargetRef.Name,
endpoint.TargetRef.Namespace,
)
if err != nil {
pp.log.Errorf("Unable to create new address:%v", err)
continue
@ -1023,6 +1038,7 @@ func (pp *portPublisher) endpointSliceToIDs(es *discovery.EndpointSlice) []ID {
ids = append(ids, PodID{
Name: endpoint.TargetRef.Name,
Namespace: endpoint.TargetRef.Namespace,
IPFamily: corev1.IPFamily(es.AddressType),
})
} else if endpoint.TargetRef.Kind == endpointTargetRefExternalWorkload {
ids = append(ids, ExternalWorkloadID{
@ -1062,7 +1078,13 @@ func (pp *portPublisher) endpointsToAddresses(endpoints *corev1.Endpoints) Addre
}
if endpoint.TargetRef.Kind == endpointTargetRefPod {
address, id, err := pp.newPodRefAddress(resolvedPort, endpoint.IP, endpoint.TargetRef.Name, endpoint.TargetRef.Namespace)
address, id, err := pp.newPodRefAddress(
resolvedPort,
"",
endpoint.IP,
endpoint.TargetRef.Name,
endpoint.TargetRef.Namespace,
)
if err != nil {
pp.log.Errorf("Unable to create new address:%v", err)
continue
@ -1095,10 +1117,17 @@ func (pp *portPublisher) newServiceRefAddress(endpointPort Port, endpointIP, ser
return Address{IP: endpointIP, Port: endpointPort}, id
}
func (pp *portPublisher) newPodRefAddress(endpointPort Port, endpointIP, podName, podNamespace string) (Address, PodID, error) {
func (pp *portPublisher) newPodRefAddress(
endpointPort Port,
ipFamily discovery.AddressType,
endpointIP,
podName,
podNamespace string,
) (Address, PodID, error) {
id := PodID{
Name: podName,
Namespace: podNamespace,
IPFamily: corev1.IPFamily(ipFamily),
}
pod, err := pp.k8sAPI.Pod().Lister().Pods(id.Namespace).Get(id.Name)
if err != nil {
@ -1462,12 +1491,12 @@ func getEndpointSliceServiceID(es *discovery.EndpointSlice) (ServiceID, error) {
}
if svc, ok := es.Labels[discovery.LabelServiceName]; ok {
return ServiceID{es.Namespace, svc}, nil
return ServiceID{Namespace: es.Namespace, Name: svc}, nil
}
for _, ref := range es.OwnerReferences {
if ref.Kind == "Service" && ref.Name != "" {
return ServiceID{es.Namespace, ref.Name}, nil
return ServiceID{Namespace: es.Namespace, Name: ref.Name}, nil
}
}

View File

@ -33,6 +33,9 @@ type (
ID struct {
Namespace string
Name string
// Only used for PodID
IPFamily corev1.IPFamily
}
// ServiceID is the namespace-qualified name of a service.
ServiceID = ID

View File

@ -36,6 +36,8 @@ func Main(args []string) {
"Enable transparently upgraded HTTP2 connections among pods in the service mesh")
enableEndpointSlices := cmd.Bool("enable-endpoint-slices", true,
"Enable the usage of EndpointSlice informers and resources")
enableIPv6 := cmd.Bool("enable-ipv6", true,
"Set to true to allow discovering IPv6 endpoints and preferring IPv6 when both IPv4 and IPv6 are available")
trustDomain := cmd.String("identity-trust-domain", "", "configures the name suffix used for identities")
clusterDomain := cmd.String("cluster-domain", "", "kubernetes cluster domain")
defaultOpaquePorts := cmd.String("default-opaque-ports", "", "configures the default opaque ports")
@ -62,6 +64,10 @@ func Main(args []string) {
flags.ConfigureAndParse(cmd, args)
if *enableIPv6 && !*enableEndpointSlices {
log.Fatal("If --enable-ipv6=true then --enable-endpoint-slices needs to be true")
}
var meshedHTTP2ClientParams *pb.Http2ClientParams
if meshedHTTP2ClientParamsJSON != nil && *meshedHTTP2ClientParamsJSON != "" {
meshedHTTP2ClientParams = &pb.Http2ClientParams{}
@ -167,6 +173,7 @@ func Main(args []string) {
DefaultOpaquePorts: opaquePorts,
EnableH2Upgrade: *enableH2Upgrade,
EnableEndpointSlices: *enableEndpointSlices,
EnableIPv6: *enableIPv6,
ExtEndpointZoneWeights: *extEndpointZoneWeights,
MeshedHttp2ClientParams: meshedHTTP2ClientParams,
}