Detect changes in addresses when getting updates in endpoints watcher (#4104)

Detect changes in addresses when getting updates in endpoints watcher

Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
This commit is contained in:
Zahari Dichev 2020-04-10 11:42:39 +03:00 committed by GitHub
parent 7b9d475ffc
commit 26c14d3c66
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 324 additions and 3 deletions

View File

@ -601,15 +601,37 @@ func getTargetPort(service *corev1.Service, port Port) namedPort {
return targetPort
}
func addressChanged(oldAddress Address, newAddress Address) bool {
if oldAddress.Identity != newAddress.Identity {
// in this case the identity could have changed; this can happen when for
// example a mirrored service is reassigned to a new gateway with a different
// identity and the service mirroring controller picks that and updates the
// identity
return true
}
if oldAddress.Pod != nil && newAddress.Pod != nil {
// if these addresses are owned by pods we can check the resource versions
return oldAddress.Pod.ResourceVersion != newAddress.Pod.ResourceVersion
}
return false
}
func diffAddresses(oldAddresses, newAddresses AddressSet) (add, remove AddressSet) {
// TODO: this detects pods which have been added or removed, but does not
// detect addresses which have been modified. A modified address should trigger
// an add of the new version.
addAddesses := make(map[ID]Address)
removeAddresses := make(map[ID]Address)
for id, address := range newAddresses.Addresses {
if _, ok := oldAddresses.Addresses[id]; !ok {
addAddesses[id] = address
for id, newAddress := range newAddresses.Addresses {
if oldAddress, ok := oldAddresses.Addresses[id]; ok {
if addressChanged(oldAddress, newAddress) {
addAddesses[id] = newAddress
}
} else {
// this is a new address, we need to add it
addAddesses[id] = newAddress
}
}
for id, address := range oldAddresses.Addresses {

View File

@ -9,6 +9,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
consts "github.com/linkerd/linkerd2/pkg/k8s"
logging "github.com/sirupsen/logrus"
)
@ -54,6 +55,36 @@ func (bel *bufferingEndpointListener) NoEndpoints(exists bool) {
bel.noEndpointsExists = exists
}
type bufferingEndpointListenerWithResVersion struct {
added []string
removed []string
}
func newBufferingEndpointListenerWithResVersion() *bufferingEndpointListenerWithResVersion {
return &bufferingEndpointListenerWithResVersion{
added: []string{},
removed: []string{},
}
}
func addressStringWithResVerson(address Address) string {
return fmt.Sprintf("%s:%d:%s", address.IP, address.Port, address.Pod.ResourceVersion)
}
func (bel *bufferingEndpointListenerWithResVersion) Add(set AddressSet) {
for _, address := range set.Addresses {
bel.added = append(bel.added, addressStringWithResVerson(address))
}
}
func (bel *bufferingEndpointListenerWithResVersion) Remove(set AddressSet) {
for _, address := range set.Addresses {
bel.removed = append(bel.removed, addressStringWithResVerson(address))
}
}
func (bel *bufferingEndpointListenerWithResVersion) NoEndpoints(exists bool) {}
func TestEndpointsWatcher(t *testing.T) {
for _, tt := range []struct {
serviceType string
@ -821,3 +852,271 @@ subsets:
})
}
}
func testPod(resVersion string) *corev1.Pod {
return &corev1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: resVersion,
Name: "name1-1",
Namespace: "ns",
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
PodIP: "172.17.0.12",
},
}
}
func endpoints(identity string) *corev1.Endpoints {
return &corev1.Endpoints{
TypeMeta: metav1.TypeMeta{
Kind: "Endpoints",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "remote-service",
Namespace: "ns",
Annotations: map[string]string{
consts.RemoteGatewayIdentity: identity,
consts.RemoteServiceFqName: "remote-service.svc.default.cluster.local",
},
Labels: map[string]string{
consts.MirroredResourceLabel: "true",
},
},
Subsets: []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
{
IP: "1.2.3.4",
},
},
Ports: []corev1.EndpointPort{
{
Port: 80,
},
},
},
},
}
}
func TestEndpointsChangeDetection(t *testing.T) {
k8sConfigs := []string{`
apiVersion: v1
kind: Service
metadata:
name: remote-service
namespace: ns
spec:
ports:
- port: 80
targetPort: 80`,
`
apiVersion: v1
kind: Endpoints
metadata:
name: remote-service
namespace: ns
annotations:
mirror.linkerd.io/remote-gateway-identity: "gateway-identity-1"
mirror.linkerd.io/remote-svc-fq-name: "remote-service.svc.default.cluster.local"
labels:
mirror.linkerd.io/mirrored-service: "true"
subsets:
- addresses:
- ip: 1.2.3.4
ports:
- port: 80`,
}
for _, tt := range []struct {
serviceType string
id ServiceID
port Port
newEndpoints *corev1.Endpoints
expectedAddresses []string
}{
{
serviceType: "will update endpoints if identity is different",
id: ServiceID{Name: "remote-service", Namespace: "ns"},
port: 80,
newEndpoints: endpoints("gateway-identity-2"),
expectedAddresses: []string{"1.2.3.4:80/gateway-identity-1/remote-service.svc.default.cluster.local:80", "1.2.3.4:80/gateway-identity-2/remote-service.svc.default.cluster.local:80"},
},
{
serviceType: "will not update endpoints if identity is the same",
id: ServiceID{Name: "remote-service", Namespace: "ns"},
port: 80,
newEndpoints: endpoints("gateway-identity-1"),
expectedAddresses: []string{"1.2.3.4:80/gateway-identity-1/remote-service.svc.default.cluster.local:80"},
},
} {
tt := tt // pin
t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) {
k8sAPI, err := k8s.NewFakeAPI(k8sConfigs...)
if err != nil {
t.Fatalf("NewFakeAPI returned an error: %s", err)
}
watcher := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()))
k8sAPI.Sync(nil)
listener := newBufferingEndpointListener()
err = watcher.Subscribe(tt.id, tt.port, "", listener)
if err != nil {
t.Fatal(err)
}
k8sAPI.Sync(nil)
watcher.addEndpoints(tt.newEndpoints)
actualAddresses := make([]string, 0)
actualAddresses = append(actualAddresses, listener.added...)
sort.Strings(actualAddresses)
testCompare(t, tt.expectedAddresses, actualAddresses)
})
}
}
func TestPodChangeDetection(t *testing.T) {
endpoints := &corev1.Endpoints{
TypeMeta: metav1.TypeMeta{
Kind: "Endpoints",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "name1",
Namespace: "ns",
},
Subsets: []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
{
IP: "172.17.0.12",
Hostname: "name1-1",
TargetRef: &corev1.ObjectReference{
Kind: "Pod",
Namespace: "ns",
Name: "name1-1",
},
},
},
Ports: []corev1.EndpointPort{
{
Port: 8989,
},
},
},
},
}
k8sConfigs := []string{`
apiVersion: v1
kind: Service
metadata:
name: name1
namespace: ns
spec:
type: LoadBalancer
ports:
- port: 8989`,
`
apiVersion: v1
kind: Endpoints
metadata:
name: name1
namespace: ns
subsets:
- addresses:
- ip: 172.17.0.12
hostname: name1-1
targetRef:
kind: Pod
name: name1-1
namespace: ns
ports:
- port: 8989`,
`
apiVersion: v1
kind: Pod
metadata:
name: name1-1
namespace: ns
resourceVersion: "1"
status:
phase: Running
podIP: 172.17.0.12`}
for _, tt := range []struct {
serviceType string
id ServiceID
hostname string
port Port
newPod *corev1.Pod
expectedAddresses []string
}{
{
serviceType: "will update pod if resource version is different",
id: ServiceID{Name: "name1", Namespace: "ns"},
port: 8989,
hostname: "name1-1",
newPod: testPod("2"),
expectedAddresses: []string{"172.17.0.12:8989:1", "172.17.0.12:8989:2"},
},
{
serviceType: "will not update pod if resource version is the same",
id: ServiceID{Name: "name1", Namespace: "ns"},
port: 8989,
hostname: "name1-1",
newPod: testPod("1"),
expectedAddresses: []string{"172.17.0.12:8989:1"},
},
} {
tt := tt // pin
t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) {
k8sAPI, err := k8s.NewFakeAPI(k8sConfigs...)
if err != nil {
t.Fatalf("NewFakeAPI returned an error: %s", err)
}
watcher := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()))
k8sAPI.Sync(nil)
listener := newBufferingEndpointListenerWithResVersion()
err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener)
if err != nil {
t.Fatal(err)
}
err = k8sAPI.Pod().Informer().GetStore().Add(tt.newPod)
if err != nil {
t.Fatal(err)
}
k8sAPI.Sync(nil)
watcher.addEndpoints(endpoints)
actualAddresses := make([]string, 0)
actualAddresses = append(actualAddresses, listener.added...)
sort.Strings(actualAddresses)
testCompare(t, tt.expectedAddresses, actualAddresses)
})
}
}