mirror of https://github.com/linkerd/linkerd2.git
3465 lines
76 KiB
Go
3465 lines
76 KiB
Go
package watcher
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sort"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/linkerd/linkerd2/controller/k8s"
|
|
consts "github.com/linkerd/linkerd2/pkg/k8s"
|
|
"github.com/linkerd/linkerd2/testutil"
|
|
logging "github.com/sirupsen/logrus"
|
|
corev1 "k8s.io/api/core/v1"
|
|
dv1 "k8s.io/api/discovery/v1"
|
|
kerrors "k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
)
|
|
|
|
type bufferingEndpointListener struct {
|
|
added []string
|
|
removed []string
|
|
localTrafficPolicy bool
|
|
noEndpointsCalled bool
|
|
noEndpointsExist bool
|
|
sync.Mutex
|
|
}
|
|
|
|
func newBufferingEndpointListener() *bufferingEndpointListener {
|
|
return &bufferingEndpointListener{
|
|
added: []string{},
|
|
removed: []string{},
|
|
Mutex: sync.Mutex{},
|
|
}
|
|
}
|
|
|
|
func addressString(address Address) string {
|
|
addressString := fmt.Sprintf("%s:%d", address.IP, address.Port)
|
|
if address.Identity != "" {
|
|
addressString = fmt.Sprintf("%s/%s", addressString, address.Identity)
|
|
}
|
|
if address.AuthorityOverride != "" {
|
|
addressString = fmt.Sprintf("%s/%s", addressString, address.AuthorityOverride)
|
|
}
|
|
return addressString
|
|
}
|
|
|
|
func (bel *bufferingEndpointListener) ExpectAdded(expected []string, t *testing.T) {
|
|
bel.Lock()
|
|
defer bel.Unlock()
|
|
t.Helper()
|
|
sort.Strings(bel.added)
|
|
testCompare(t, expected, bel.added)
|
|
}
|
|
|
|
func (bel *bufferingEndpointListener) ExpectRemoved(expected []string, t *testing.T) {
|
|
bel.Lock()
|
|
defer bel.Unlock()
|
|
t.Helper()
|
|
sort.Strings(bel.removed)
|
|
testCompare(t, expected, bel.removed)
|
|
}
|
|
|
|
func (bel *bufferingEndpointListener) endpointsAreNotCalled() bool {
|
|
bel.Lock()
|
|
defer bel.Unlock()
|
|
return bel.noEndpointsCalled
|
|
}
|
|
|
|
func (bel *bufferingEndpointListener) endpointsDoNotExist() bool {
|
|
bel.Lock()
|
|
defer bel.Unlock()
|
|
return bel.noEndpointsExist
|
|
}
|
|
|
|
func (bel *bufferingEndpointListener) Add(set AddressSet) {
|
|
bel.Lock()
|
|
defer bel.Unlock()
|
|
for _, address := range set.Addresses {
|
|
bel.added = append(bel.added, addressString(address))
|
|
}
|
|
bel.localTrafficPolicy = set.LocalTrafficPolicy
|
|
}
|
|
|
|
func (bel *bufferingEndpointListener) Remove(set AddressSet) {
|
|
bel.Lock()
|
|
defer bel.Unlock()
|
|
for _, address := range set.Addresses {
|
|
bel.removed = append(bel.removed, addressString(address))
|
|
}
|
|
bel.localTrafficPolicy = set.LocalTrafficPolicy
|
|
}
|
|
|
|
func (bel *bufferingEndpointListener) NoEndpoints(exists bool) {
|
|
bel.Lock()
|
|
defer bel.Unlock()
|
|
bel.noEndpointsCalled = true
|
|
bel.noEndpointsExist = exists
|
|
}
|
|
|
|
type bufferingEndpointListenerWithResVersion struct {
|
|
added []string
|
|
removed []string
|
|
sync.Mutex
|
|
}
|
|
|
|
func newBufferingEndpointListenerWithResVersion() *bufferingEndpointListenerWithResVersion {
|
|
return &bufferingEndpointListenerWithResVersion{
|
|
added: []string{},
|
|
removed: []string{},
|
|
Mutex: sync.Mutex{},
|
|
}
|
|
}
|
|
|
|
func addressStringWithResVersion(address Address) string {
|
|
return fmt.Sprintf("%s:%d:%s", address.IP, address.Port, address.Pod.ResourceVersion)
|
|
}
|
|
|
|
func (bel *bufferingEndpointListenerWithResVersion) ExpectAdded(expected []string, t *testing.T) {
|
|
bel.Lock()
|
|
defer bel.Unlock()
|
|
sort.Strings(bel.added)
|
|
testCompare(t, expected, bel.added)
|
|
}
|
|
|
|
func (bel *bufferingEndpointListenerWithResVersion) ExpectRemoved(expected []string, t *testing.T) {
|
|
bel.Lock()
|
|
defer bel.Unlock()
|
|
sort.Strings(bel.removed)
|
|
testCompare(t, expected, bel.removed)
|
|
}
|
|
|
|
func (bel *bufferingEndpointListenerWithResVersion) Add(set AddressSet) {
|
|
bel.Lock()
|
|
defer bel.Unlock()
|
|
for _, address := range set.Addresses {
|
|
bel.added = append(bel.added, addressStringWithResVersion(address))
|
|
}
|
|
}
|
|
|
|
func (bel *bufferingEndpointListenerWithResVersion) Remove(set AddressSet) {
|
|
bel.Lock()
|
|
defer bel.Unlock()
|
|
for _, address := range set.Addresses {
|
|
bel.removed = append(bel.removed, addressStringWithResVersion(address))
|
|
}
|
|
}
|
|
|
|
func (bel *bufferingEndpointListenerWithResVersion) NoEndpoints(exists bool) {}
|
|
|
|
func TestEndpointsWatcher(t *testing.T) {
|
|
for _, tt := range []struct {
|
|
serviceType string
|
|
k8sConfigs []string
|
|
id ServiceID
|
|
hostname string
|
|
port Port
|
|
expectedAddresses []string
|
|
expectedNoEndpoints bool
|
|
expectedNoEndpointsServiceExists bool
|
|
expectedError bool
|
|
}{
|
|
{
|
|
serviceType: "local services",
|
|
k8sConfigs: []string{`
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: name1
|
|
namespace: ns
|
|
spec:
|
|
type: LoadBalancer
|
|
ports:
|
|
- port: 8989`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Endpoints
|
|
metadata:
|
|
name: name1
|
|
namespace: ns
|
|
subsets:
|
|
- addresses:
|
|
- ip: 172.17.0.12
|
|
targetRef:
|
|
kind: Pod
|
|
name: name1-1
|
|
namespace: ns
|
|
- ip: 172.17.0.19
|
|
targetRef:
|
|
kind: Pod
|
|
name: name1-2
|
|
namespace: ns
|
|
- ip: 172.17.0.20
|
|
targetRef:
|
|
kind: Pod
|
|
name: name1-3
|
|
namespace: ns
|
|
- ip: 172.17.0.21
|
|
ports:
|
|
- port: 8989`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: name1-1
|
|
namespace: ns
|
|
ownerReferences:
|
|
- kind: ReplicaSet
|
|
name: rs-1
|
|
status:
|
|
phase: Running
|
|
podIP: 172.17.0.12`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: name1-2
|
|
namespace: ns
|
|
ownerReferences:
|
|
- kind: ReplicaSet
|
|
name: rs-1
|
|
status:
|
|
phase: Running
|
|
podIP: 172.17.0.19`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: name1-3
|
|
namespace: ns
|
|
ownerReferences:
|
|
- kind: ReplicaSet
|
|
name: rs-1
|
|
status:
|
|
phase: Running
|
|
podIP: 172.17.0.20`,
|
|
},
|
|
id: ServiceID{Name: "name1", Namespace: "ns"},
|
|
port: 8989,
|
|
expectedAddresses: []string{
|
|
"172.17.0.12:8989",
|
|
"172.17.0.19:8989",
|
|
"172.17.0.20:8989",
|
|
"172.17.0.21:8989",
|
|
},
|
|
expectedNoEndpoints: false,
|
|
expectedNoEndpointsServiceExists: false,
|
|
expectedError: false,
|
|
},
|
|
{
|
|
// Test for the issue described in linkerd/linkerd2#1405.
|
|
serviceType: "local NodePort service with unnamed port",
|
|
k8sConfigs: []string{`
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: name1
|
|
namespace: ns
|
|
spec:
|
|
type: NodePort
|
|
ports:
|
|
- port: 8989
|
|
targetPort: port1`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Endpoints
|
|
metadata:
|
|
name: name1
|
|
namespace: ns
|
|
subsets:
|
|
- addresses:
|
|
- ip: 10.233.66.239
|
|
targetRef:
|
|
kind: Pod
|
|
name: name1-f748fb6b4-hpwpw
|
|
namespace: ns
|
|
- ip: 10.233.88.244
|
|
targetRef:
|
|
kind: Pod
|
|
name: name1-f748fb6b4-6vcmw
|
|
namespace: ns
|
|
ports:
|
|
- port: 8990
|
|
protocol: TCP`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: name1-f748fb6b4-hpwpw
|
|
namespace: ns
|
|
ownerReferences:
|
|
- kind: ReplicaSet
|
|
name: rs-1
|
|
status:
|
|
podIp: 10.233.66.239
|
|
phase: Running`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: name1-f748fb6b4-6vcmw
|
|
namespace: ns
|
|
ownerReferences:
|
|
- kind: ReplicaSet
|
|
name: rs-1
|
|
status:
|
|
podIp: 10.233.88.244
|
|
phase: Running`,
|
|
},
|
|
id: ServiceID{Name: "name1", Namespace: "ns"},
|
|
port: 8989,
|
|
expectedAddresses: []string{
|
|
"10.233.66.239:8990",
|
|
"10.233.88.244:8990",
|
|
},
|
|
expectedNoEndpoints: false,
|
|
expectedNoEndpointsServiceExists: false,
|
|
expectedError: false,
|
|
},
|
|
{
|
|
// Test for the issue described in linkerd/linkerd2#1853.
|
|
serviceType: "local service with named target port and differently-named service port",
|
|
k8sConfigs: []string{`
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: world
|
|
namespace: ns
|
|
spec:
|
|
type: ClusterIP
|
|
ports:
|
|
- name: app
|
|
port: 7778
|
|
targetPort: http`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Endpoints
|
|
metadata:
|
|
name: world
|
|
namespace: ns
|
|
subsets:
|
|
- addresses:
|
|
- ip: 10.1.30.135
|
|
targetRef:
|
|
kind: Pod
|
|
name: world-575bf846b4-tp4hw
|
|
namespace: ns
|
|
ports:
|
|
- name: app
|
|
port: 7779
|
|
protocol: TCP`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: world-575bf846b4-tp4hw
|
|
namespace: ns
|
|
ownerReferences:
|
|
- kind: ReplicaSet
|
|
name: rs-1
|
|
status:
|
|
podIp: 10.1.30.135
|
|
phase: Running`,
|
|
},
|
|
id: ServiceID{Name: "world", Namespace: "ns"},
|
|
port: 7778,
|
|
expectedAddresses: []string{
|
|
"10.1.30.135:7779",
|
|
},
|
|
expectedNoEndpoints: false,
|
|
expectedNoEndpointsServiceExists: false,
|
|
expectedError: false,
|
|
},
|
|
{
|
|
serviceType: "local services with missing addresses",
|
|
k8sConfigs: []string{`
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: name1
|
|
namespace: ns
|
|
spec:
|
|
type: LoadBalancer
|
|
ports:
|
|
- port: 8989`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Endpoints
|
|
metadata:
|
|
name: name1
|
|
namespace: ns
|
|
subsets:
|
|
- addresses:
|
|
- ip: 172.17.0.23
|
|
targetRef:
|
|
kind: Pod
|
|
name: name1-1
|
|
namespace: ns
|
|
- ip: 172.17.0.24
|
|
targetRef:
|
|
kind: Pod
|
|
name: name1-2
|
|
namespace: ns
|
|
- ip: 172.17.0.25
|
|
targetRef:
|
|
kind: Pod
|
|
name: name1-3
|
|
namespace: ns
|
|
ports:
|
|
- port: 8989`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: name1-3
|
|
namespace: ns
|
|
ownerReferences:
|
|
- kind: ReplicaSet
|
|
name: rs-1
|
|
status:
|
|
phase: Running
|
|
podIP: 172.17.0.25`,
|
|
},
|
|
id: ServiceID{Name: "name1", Namespace: "ns"},
|
|
port: 8989,
|
|
expectedAddresses: []string{
|
|
"172.17.0.25:8989",
|
|
},
|
|
expectedNoEndpoints: false,
|
|
expectedNoEndpointsServiceExists: false,
|
|
expectedError: false,
|
|
},
|
|
{
|
|
serviceType: "local services with no endpoints",
|
|
k8sConfigs: []string{`
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: name2
|
|
namespace: ns
|
|
spec:
|
|
type: LoadBalancer
|
|
ports:
|
|
- port: 7979`,
|
|
},
|
|
id: ServiceID{Name: "name2", Namespace: "ns"},
|
|
port: 7979,
|
|
expectedAddresses: []string{},
|
|
expectedNoEndpoints: true,
|
|
expectedNoEndpointsServiceExists: true,
|
|
expectedError: false,
|
|
},
|
|
{
|
|
serviceType: "external name services",
|
|
k8sConfigs: []string{`
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: name3
|
|
namespace: ns
|
|
spec:
|
|
type: ExternalName
|
|
externalName: foo`,
|
|
},
|
|
id: ServiceID{Name: "name3", Namespace: "ns"},
|
|
port: 6969,
|
|
expectedAddresses: []string{},
|
|
expectedNoEndpoints: false,
|
|
expectedNoEndpointsServiceExists: false,
|
|
expectedError: true,
|
|
},
|
|
{
|
|
serviceType: "services that do not yet exist",
|
|
k8sConfigs: []string{},
|
|
id: ServiceID{Name: "name4", Namespace: "ns"},
|
|
port: 5959,
|
|
expectedAddresses: []string{},
|
|
expectedNoEndpoints: true,
|
|
expectedNoEndpointsServiceExists: false,
|
|
expectedError: false,
|
|
},
|
|
{
|
|
serviceType: "stateful sets",
|
|
k8sConfigs: []string{`
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: name1
|
|
namespace: ns
|
|
spec:
|
|
type: LoadBalancer
|
|
ports:
|
|
- port: 8989`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Endpoints
|
|
metadata:
|
|
name: name1
|
|
namespace: ns
|
|
subsets:
|
|
- addresses:
|
|
- ip: 172.17.0.12
|
|
hostname: name1-1
|
|
targetRef:
|
|
kind: Pod
|
|
name: name1-1
|
|
namespace: ns
|
|
- ip: 172.17.0.19
|
|
hostname: name1-2
|
|
targetRef:
|
|
kind: Pod
|
|
name: name1-2
|
|
namespace: ns
|
|
- ip: 172.17.0.20
|
|
hostname: name1-3
|
|
targetRef:
|
|
kind: Pod
|
|
name: name1-3
|
|
namespace: ns
|
|
ports:
|
|
- port: 8989`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: name1-1
|
|
namespace: ns
|
|
ownerReferences:
|
|
- kind: ReplicaSet
|
|
name: rs-1
|
|
status:
|
|
phase: Running
|
|
podIP: 172.17.0.12`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: name1-2
|
|
namespace: ns
|
|
ownerReferences:
|
|
- kind: ReplicaSet
|
|
name: rs-1
|
|
status:
|
|
phase: Running
|
|
podIP: 172.17.0.19`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: name1-3
|
|
namespace: ns
|
|
ownerReferences:
|
|
- kind: ReplicaSet
|
|
name: rs-1
|
|
status:
|
|
phase: Running
|
|
podIP: 172.17.0.20`,
|
|
},
|
|
id: ServiceID{Name: "name1", Namespace: "ns"},
|
|
hostname: "name1-3",
|
|
port: 5959,
|
|
expectedAddresses: []string{"172.17.0.20:5959"},
|
|
expectedNoEndpoints: false,
|
|
expectedNoEndpointsServiceExists: false,
|
|
},
|
|
{
|
|
serviceType: "local service with new named port mid rollout and two subsets but only first subset is relevant",
|
|
k8sConfigs: []string{`
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: name1
|
|
namespace: ns
|
|
spec:
|
|
type: ClusterIP
|
|
ports:
|
|
- name: port1
|
|
port: 8989
|
|
targetPort: port1
|
|
- name: port2
|
|
port: 9999
|
|
targetPort: port2`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Endpoints
|
|
metadata:
|
|
labels:
|
|
app: name1
|
|
name: name1
|
|
namespace: ns
|
|
subsets:
|
|
- addresses:
|
|
- ip: 172.17.0.1
|
|
nodeName: name1-1
|
|
targetRef:
|
|
kind: Pod
|
|
name: name1-1
|
|
namespace: ns
|
|
- ip: 172.17.0.2
|
|
nodeName: name1-2
|
|
targetRef:
|
|
kind: Pod
|
|
name: name1-2
|
|
namespace: ns
|
|
ports:
|
|
- name: port1
|
|
port: 8989
|
|
protocol: TCP
|
|
- addresses:
|
|
- ip: 172.17.0.1
|
|
nodeName: name1-1
|
|
targetRef:
|
|
kind: Pod
|
|
name: name1-1
|
|
namespace: ns
|
|
notReadyAddresses:
|
|
- ip: 172.17.0.2
|
|
nodeName: name1-2
|
|
targetRef:
|
|
kind: Pod
|
|
name: name1-2
|
|
namespace: ns
|
|
ports:
|
|
- name: port2
|
|
port: 9999
|
|
protocol: TCP
|
|
`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: name1-1
|
|
namespace: ns
|
|
ownerReferences:
|
|
- kind: ReplicaSet
|
|
name: rs-1
|
|
status:
|
|
phase: Running
|
|
podIP: 172.17.0.1`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: name1-2
|
|
namespace: ns
|
|
ownerReferences:
|
|
- kind: ReplicaSet
|
|
name: rs-2
|
|
status:
|
|
phase: Running
|
|
podIP: 172.17.0.2`,
|
|
},
|
|
id: ServiceID{Name: "name1", Namespace: "ns"},
|
|
port: 8989,
|
|
expectedAddresses: []string{
|
|
"172.17.0.1:8989",
|
|
"172.17.0.2:8989",
|
|
},
|
|
expectedNoEndpoints: false,
|
|
expectedNoEndpointsServiceExists: false,
|
|
expectedError: false,
|
|
},
|
|
} {
|
|
tt := tt // pin
|
|
t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) {
|
|
k8sAPI, err := k8s.NewFakeAPI(tt.k8sConfigs...)
|
|
if err != nil {
|
|
t.Fatalf("NewFakeAPI returned an error: %s", err)
|
|
}
|
|
|
|
metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
|
|
if err != nil {
|
|
t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
|
|
}
|
|
|
|
watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), false, "local")
|
|
if err != nil {
|
|
t.Fatalf("can't create Endpoints watcher: %s", err)
|
|
}
|
|
|
|
k8sAPI.Sync(nil)
|
|
metadataAPI.Sync(nil)
|
|
|
|
listener := newBufferingEndpointListener()
|
|
|
|
err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener)
|
|
if tt.expectedError && err == nil {
|
|
t.Fatal("Expected error but was ok")
|
|
}
|
|
if !tt.expectedError && err != nil {
|
|
t.Fatalf("Expected no error, got [%s]", err)
|
|
}
|
|
|
|
listener.ExpectAdded(tt.expectedAddresses, t)
|
|
|
|
if listener.endpointsAreNotCalled() != tt.expectedNoEndpoints {
|
|
t.Fatalf("Expected noEndpointsCalled to be [%t], got [%t]",
|
|
tt.expectedNoEndpoints, listener.endpointsAreNotCalled())
|
|
}
|
|
|
|
if listener.endpointsDoNotExist() != tt.expectedNoEndpointsServiceExists {
|
|
t.Fatalf("Expected noEndpointsExist to be [%t], got [%t]",
|
|
tt.expectedNoEndpointsServiceExists, listener.endpointsDoNotExist())
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestEndpointsWatcherWithEndpointSlices(t *testing.T) {
|
|
for _, tt := range []struct {
|
|
serviceType string
|
|
k8sConfigs []string
|
|
id ServiceID
|
|
hostname string
|
|
port Port
|
|
expectedAddresses []string
|
|
expectedNoEndpoints bool
|
|
expectedNoEndpointsServiceExists bool
|
|
expectedError bool
|
|
expectedLocalTrafficPolicy bool
|
|
}{
|
|
{
|
|
serviceType: "local services with EndpointSlice",
|
|
k8sConfigs: []string{`
|
|
kind: APIResourceList
|
|
apiVersion: v1
|
|
groupVersion: discovery.k8s.io/v1
|
|
resources:
|
|
- name: endpointslices
|
|
singularName: endpointslice
|
|
namespaced: true
|
|
kind: EndpointSlice
|
|
verbs:
|
|
- delete
|
|
- deletecollection
|
|
- get
|
|
- list
|
|
- patch
|
|
- create
|
|
- update
|
|
- watch
|
|
`, `
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: name-1
|
|
namespace: ns
|
|
spec:
|
|
type: LoadBalancer
|
|
ports:
|
|
- port: 8989
|
|
internalTrafficPolicy: Local`,
|
|
`
|
|
addressType: IPv4
|
|
apiVersion: discovery.k8s.io/v1
|
|
endpoints:
|
|
- addresses:
|
|
- 172.17.0.12
|
|
conditions:
|
|
ready: true
|
|
targetRef:
|
|
kind: Pod
|
|
name: name-1-1
|
|
namespace: ns
|
|
topology:
|
|
kubernetes.io/hostname: node-1
|
|
- addresses:
|
|
- 172.17.0.19
|
|
conditions:
|
|
ready: true
|
|
targetRef:
|
|
kind: Pod
|
|
name: name-1-2
|
|
namespace: ns
|
|
topology:
|
|
kubernetes.io/hostname: node-1
|
|
- addresses:
|
|
- 172.17.0.20
|
|
conditions:
|
|
ready: true
|
|
targetRef:
|
|
kind: Pod
|
|
name: name-1-3
|
|
namespace: ns
|
|
topology:
|
|
kubernetes.io/hostname: node-2
|
|
- addresses:
|
|
- 172.17.0.21
|
|
conditions:
|
|
ready: true
|
|
topology:
|
|
kubernetes.io/hostname: node-2
|
|
kind: EndpointSlice
|
|
metadata:
|
|
labels:
|
|
kubernetes.io/service-name: name-1
|
|
name: name-1-bhnqh
|
|
namespace: ns
|
|
ports:
|
|
- name: ""
|
|
port: 8989`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: name-1-1
|
|
namespace: ns
|
|
ownerReferences:
|
|
- kind: ReplicaSet
|
|
name: rs-1
|
|
status:
|
|
phase: Running
|
|
podIP: 172.17.0.12`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: name-1-2
|
|
namespace: ns
|
|
ownerReferences:
|
|
- kind: ReplicaSet
|
|
name: rs-1
|
|
status:
|
|
phase: Running
|
|
podIP: 172.17.0.19`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: name-1-3
|
|
namespace: ns
|
|
ownerReferences:
|
|
- kind: ReplicaSet
|
|
name: rs-1
|
|
status:
|
|
phase: Running
|
|
podIP: 172.17.0.20`,
|
|
},
|
|
id: ServiceID{Name: "name-1", Namespace: "ns"},
|
|
port: 8989,
|
|
expectedAddresses: []string{
|
|
"172.17.0.12:8989",
|
|
"172.17.0.19:8989",
|
|
"172.17.0.20:8989",
|
|
"172.17.0.21:8989",
|
|
},
|
|
expectedNoEndpoints: false,
|
|
expectedNoEndpointsServiceExists: false,
|
|
expectedError: false,
|
|
expectedLocalTrafficPolicy: true,
|
|
},
|
|
{
|
|
serviceType: "local services with missing addresses and EndpointSlice",
|
|
k8sConfigs: []string{`
|
|
kind: APIResourceList
|
|
apiVersion: v1
|
|
groupVersion: discovery.k8s.io/v1
|
|
resources:
|
|
- name: endpointslices
|
|
singularName: endpointslice
|
|
namespaced: true
|
|
kind: EndpointSlice
|
|
verbs:
|
|
- delete
|
|
- deletecollection
|
|
- get
|
|
- list
|
|
- patch
|
|
- create
|
|
- update
|
|
- watch
|
|
`, `
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: name-1
|
|
namespace: ns
|
|
spec:
|
|
type: LoadBalancer
|
|
ports:
|
|
- port: 8989`, `
|
|
addressType: IPv4
|
|
apiVersion: discovery.k8s.io/v1
|
|
endpoints:
|
|
- addresses:
|
|
- 172.17.0.23
|
|
conditions:
|
|
ready: true
|
|
targetRef:
|
|
kind: Pod
|
|
name: name-1-1
|
|
namespace: ns
|
|
topology:
|
|
kubernetes.io/hostname: node-1
|
|
- addresses:
|
|
- 172.17.0.24
|
|
conditions:
|
|
ready: true
|
|
targetRef:
|
|
kind: Pod
|
|
name: name-1-2
|
|
namespace: ns
|
|
topology:
|
|
kubernetes.io/hostname: node-1
|
|
- addresses:
|
|
- 172.17.0.25
|
|
conditions:
|
|
ready: true
|
|
targetRef:
|
|
kind: Pod
|
|
name: name-1-3
|
|
namespace: ns
|
|
topology:
|
|
kubernetes.io/hostname: node-2
|
|
kind: EndpointSlice
|
|
metadata:
|
|
labels:
|
|
kubernetes.io/service-name: name-1
|
|
name: name1-f5fad
|
|
namespace: ns
|
|
ports:
|
|
- name: ""
|
|
port: 8989`, `
|
|
apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: name-1-3
|
|
namespace: ns
|
|
ownerReferences:
|
|
- kind: ReplicaSet
|
|
name: rs-1
|
|
status:
|
|
podIP: 172.17.0.25
|
|
phase: Running`,
|
|
},
|
|
id: ServiceID{Name: "name-1", Namespace: "ns"},
|
|
port: 8989,
|
|
expectedAddresses: []string{"172.17.0.25:8989"},
|
|
expectedNoEndpoints: false,
|
|
expectedNoEndpointsServiceExists: false,
|
|
expectedError: false,
|
|
},
|
|
{
|
|
serviceType: "local services with no EndpointSlices",
|
|
k8sConfigs: []string{`
|
|
kind: APIResourceList
|
|
apiVersion: v1
|
|
groupVersion: discovery.k8s.io/v1
|
|
resources:
|
|
- name: endpointslices
|
|
singularName: endpointslice
|
|
namespaced: true
|
|
kind: EndpointSlice
|
|
verbs:
|
|
- delete
|
|
- deletecollection
|
|
- get
|
|
- list
|
|
- patch
|
|
- create
|
|
- update
|
|
- watch
|
|
`, `
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: name-2
|
|
namespace: ns
|
|
spec:
|
|
type: LoadBalancer
|
|
ports:
|
|
- port: 7979`,
|
|
},
|
|
id: ServiceID{Name: "name-2", Namespace: "ns"},
|
|
port: 7979,
|
|
expectedAddresses: []string{},
|
|
expectedNoEndpoints: true,
|
|
expectedNoEndpointsServiceExists: true,
|
|
expectedError: false,
|
|
},
|
|
{
|
|
serviceType: "external name services with EndpointSlices",
|
|
k8sConfigs: []string{`
|
|
kind: APIResourceList
|
|
apiVersion: v1
|
|
groupVersion: discovery.k8s.io/v1
|
|
resources:
|
|
- name: endpointslices
|
|
singularName: endpointslice
|
|
namespaced: true
|
|
kind: EndpointSlice
|
|
verbs:
|
|
- delete
|
|
- deletecollection
|
|
- get
|
|
- list
|
|
- patch
|
|
- create
|
|
- update
|
|
- watch
|
|
`, `
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: name-3-external-svc
|
|
namespace: ns
|
|
spec:
|
|
type: ExternalName
|
|
externalName: foo`,
|
|
},
|
|
id: ServiceID{Name: "name-3-external-svc", Namespace: "ns"},
|
|
port: 7777,
|
|
expectedAddresses: []string{},
|
|
expectedNoEndpoints: false,
|
|
expectedNoEndpointsServiceExists: false,
|
|
expectedError: true,
|
|
},
|
|
{
|
|
serviceType: "services that do not exist",
|
|
k8sConfigs: []string{},
|
|
id: ServiceID{Name: "name-4-inexistent-svc", Namespace: "ns"},
|
|
port: 5555,
|
|
expectedAddresses: []string{},
|
|
expectedNoEndpoints: true,
|
|
expectedNoEndpointsServiceExists: false,
|
|
expectedError: false,
|
|
},
|
|
{
|
|
serviceType: "stateful sets with EndpointSlices",
|
|
k8sConfigs: []string{`
|
|
kind: APIResourceList
|
|
apiVersion: v1
|
|
groupVersion: discovery.k8s.io/v1
|
|
resources:
|
|
- name: endpointslices
|
|
singularName: endpointslice
|
|
namespaced: true
|
|
kind: EndpointSlice
|
|
verbs:
|
|
- delete
|
|
- deletecollection
|
|
- get
|
|
- list
|
|
- patch
|
|
- create
|
|
- update
|
|
- watch
|
|
`, `
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: name-1
|
|
namespace: ns
|
|
spec:
|
|
type: LoadBalancer
|
|
ports:
|
|
- port: 8989`, `
|
|
addressType: IPv4
|
|
apiVersion: discovery.k8s.io/v1
|
|
endpoints:
|
|
- addresses:
|
|
- 172.17.0.12
|
|
conditions:
|
|
ready: true
|
|
hostname: name-1-1
|
|
targetRef:
|
|
kind: Pod
|
|
name: name-1-1
|
|
namespace: ns
|
|
topology:
|
|
kubernetes.io/hostname: node-1
|
|
- addresses:
|
|
- 172.17.0.19
|
|
hostname: name-1-2
|
|
conditions:
|
|
ready: true
|
|
targetRef:
|
|
kind: Pod
|
|
name: name-1-2
|
|
namespace: ns
|
|
topology:
|
|
kubernetes.io/hostname: node-1
|
|
- addresses:
|
|
- 172.17.0.20
|
|
hostname: name-1-3
|
|
conditions:
|
|
ready: true
|
|
targetRef:
|
|
kind: Pod
|
|
name: name-1-3
|
|
namespace: ns
|
|
topology:
|
|
kubernetes.io/hostname: node-2
|
|
kind: EndpointSlice
|
|
metadata:
|
|
labels:
|
|
kubernetes.io/service-name: name-1
|
|
name: name-1-f5fad
|
|
namespace: ns
|
|
ports:
|
|
- name: ""
|
|
port: 8989`, `
|
|
apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: name-1-1
|
|
namespace: ns
|
|
ownerReferences:
|
|
- kind: ReplicaSet
|
|
name: rs-1
|
|
status:
|
|
phase: Running
|
|
podIP: 172.17.0.12`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: name-1-2
|
|
namespace: ns
|
|
ownerReferences:
|
|
- kind: ReplicaSet
|
|
name: rs-1
|
|
status:
|
|
phase: Running
|
|
podIP: 172.17.0.19`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: name-1-3
|
|
namespace: ns
|
|
ownerReferences:
|
|
- kind: ReplicaSet
|
|
name: rs-1
|
|
status:
|
|
phase: Running
|
|
podIP: 172.17.0.20`,
|
|
},
|
|
id: ServiceID{Name: "name-1", Namespace: "ns"},
|
|
hostname: "name-1-3",
|
|
port: 6000,
|
|
expectedAddresses: []string{"172.17.0.20:6000"},
|
|
expectedNoEndpoints: false,
|
|
expectedNoEndpointsServiceExists: false,
|
|
expectedError: false,
|
|
},
|
|
{
|
|
serviceType: "service with EndpointSlice without labels",
|
|
k8sConfigs: []string{`
|
|
kind: APIResourceList
|
|
apiVersion: v1
|
|
groupVersion: discovery.k8s.io/v1
|
|
resources:
|
|
- name: endpointslices
|
|
singularName: endpointslice
|
|
namespaced: true
|
|
kind: EndpointSlice
|
|
verbs:
|
|
- delete
|
|
- deletecollection
|
|
- get
|
|
- list
|
|
- patch
|
|
- create
|
|
- update
|
|
- watch
|
|
`, `
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: name-5
|
|
namespace: ns
|
|
spec:
|
|
type: LoadBalancer
|
|
ports:
|
|
- port: 8989`, `
|
|
addressType: IPv4
|
|
apiVersion: discovery.k8s.io/v1
|
|
endpoints:
|
|
- addresses:
|
|
- 172.17.0.12
|
|
conditions:
|
|
ready: true
|
|
hostname: name-1-1
|
|
targetRef:
|
|
kind: Pod
|
|
name: name-1-1
|
|
namespace: ns
|
|
topology:
|
|
kubernetes.io/hostname: node-1
|
|
kind: EndpointSlice
|
|
metadata:
|
|
labels:
|
|
name: name-1-f5fad
|
|
namespace: ns
|
|
ports:
|
|
- name: ""
|
|
port: 8989`, `
|
|
apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: name-1-1
|
|
namespace: ns
|
|
ownerReferences:
|
|
- kind: ReplicaSet
|
|
name: rs-1
|
|
status:
|
|
phase: Running
|
|
podIP: 172.17.0.12`,
|
|
},
|
|
id: ServiceID{Name: "name-5", Namespace: "ns"},
|
|
port: 8989,
|
|
expectedAddresses: []string{},
|
|
expectedNoEndpoints: true,
|
|
expectedNoEndpointsServiceExists: true,
|
|
expectedError: false,
|
|
},
|
|
{
|
|
serviceType: "service with IPv6 address type EndpointSlice",
|
|
k8sConfigs: []string{`
|
|
kind: APIResourceList
|
|
apiVersion: v1
|
|
groupVersion: discovery.k8s.io/v1
|
|
resources:
|
|
- name: endpointslices
|
|
singularName: endpointslice
|
|
namespaced: true
|
|
kind: EndpointSlice
|
|
verbs:
|
|
- delete
|
|
- deletecollection
|
|
- get
|
|
- list
|
|
- patch
|
|
- create
|
|
- update
|
|
- watch
|
|
`, `
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: name-5
|
|
namespace: ns
|
|
spec:
|
|
type: LoadBalancer
|
|
ports:
|
|
- port: 9000`, `
|
|
addressType: IPv6
|
|
apiVersion: discovery.k8s.io/v1
|
|
endpoints:
|
|
- addresses:
|
|
- 0:0:0:0:0:0:0:1
|
|
conditions:
|
|
ready: true
|
|
targetRef:
|
|
kind: Pod
|
|
name: name-5-1
|
|
namespace: ns
|
|
topology:
|
|
kubernetes.io/hostname: node-1
|
|
kind: EndpointSlice
|
|
metadata:
|
|
labels:
|
|
name: name-5-f65dv
|
|
namespace: ns
|
|
ownerReferences:
|
|
- apiVersion: v1
|
|
kind: Service
|
|
name: name-5
|
|
ports:
|
|
- name: ""
|
|
port: 9000`, `
|
|
apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: name-5-1
|
|
namespace: ns
|
|
ownerReferences:
|
|
- kind: ReplicaSet
|
|
name: rs-1
|
|
status:
|
|
phase: Running
|
|
podIP: 0:0:0:0:0:0:0:1`,
|
|
},
|
|
id: ServiceID{Name: "name-5", Namespace: "ns"},
|
|
port: 9000,
|
|
expectedAddresses: []string{},
|
|
expectedNoEndpoints: true,
|
|
expectedNoEndpointsServiceExists: true,
|
|
expectedError: false,
|
|
}} {
|
|
tt := tt // pin
|
|
t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) {
|
|
k8sAPI, err := k8s.NewFakeAPI(tt.k8sConfigs...)
|
|
if err != nil {
|
|
t.Fatalf("NewFakeAPI returned an error: %s", err)
|
|
}
|
|
|
|
metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
|
|
if err != nil {
|
|
t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
|
|
}
|
|
|
|
watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local")
|
|
if err != nil {
|
|
t.Fatalf("can't create Endpoints watcher: %s", err)
|
|
}
|
|
|
|
k8sAPI.Sync(nil)
|
|
metadataAPI.Sync(nil)
|
|
|
|
listener := newBufferingEndpointListener()
|
|
|
|
err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener)
|
|
if tt.expectedError && err == nil {
|
|
t.Fatal("Expected error but was ok")
|
|
}
|
|
if !tt.expectedError && err != nil {
|
|
t.Fatalf("Expected no error, got [%s]", err)
|
|
}
|
|
|
|
if listener.localTrafficPolicy != tt.expectedLocalTrafficPolicy {
|
|
t.Fatalf("Expected localTrafficPolicy [%v], got [%v]", tt.expectedLocalTrafficPolicy, listener.localTrafficPolicy)
|
|
}
|
|
|
|
listener.ExpectAdded(tt.expectedAddresses, t)
|
|
|
|
if listener.endpointsAreNotCalled() != tt.expectedNoEndpoints {
|
|
t.Fatalf("Expected noEndpointsCalled to be [%t], got [%t]",
|
|
tt.expectedNoEndpoints, listener.endpointsAreNotCalled())
|
|
}
|
|
|
|
if listener.endpointsDoNotExist() != tt.expectedNoEndpointsServiceExists {
|
|
t.Fatalf("Expected noEndpointsExist to be [%t], got [%t]",
|
|
tt.expectedNoEndpointsServiceExists, listener.endpointsDoNotExist())
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestEndpointsWatcherWithEndpointSlicesExternalWorkload(t *testing.T) {
|
|
for _, tt := range []struct {
|
|
serviceType string
|
|
k8sConfigs []string
|
|
id ServiceID
|
|
hostname string
|
|
port Port
|
|
expectedAddresses []string
|
|
expectedNoEndpoints bool
|
|
expectedNoEndpointsServiceExists bool
|
|
expectedError bool
|
|
expectedLocalTrafficPolicy bool
|
|
}{
|
|
{
|
|
serviceType: "local services with EndpointSlice",
|
|
k8sConfigs: []string{`
|
|
kind: APIResourceList
|
|
apiVersion: v1
|
|
groupVersion: discovery.k8s.io/v1
|
|
resources:
|
|
- name: endpointslices
|
|
singularName: endpointslice
|
|
namespaced: true
|
|
kind: EndpointSlice
|
|
verbs:
|
|
- delete
|
|
- deletecollection
|
|
- get
|
|
- list
|
|
- patch
|
|
- create
|
|
- update
|
|
- watch
|
|
`, `
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: name-1
|
|
namespace: ns
|
|
spec:
|
|
type: LoadBalancer
|
|
ports:
|
|
- port: 8989
|
|
internalTrafficPolicy: Local`,
|
|
`
|
|
addressType: IPv4
|
|
apiVersion: discovery.k8s.io/v1
|
|
endpoints:
|
|
- addresses:
|
|
- 172.17.0.12
|
|
conditions:
|
|
ready: true
|
|
targetRef:
|
|
kind: ExternalWorkload
|
|
name: name-1-1
|
|
namespace: ns
|
|
topology:
|
|
kubernetes.io/hostname: node-1
|
|
- addresses:
|
|
- 172.17.0.19
|
|
conditions:
|
|
ready: true
|
|
targetRef:
|
|
kind: ExternalWorkload
|
|
name: name-1-2
|
|
namespace: ns
|
|
topology:
|
|
kubernetes.io/hostname: node-1
|
|
- addresses:
|
|
- 172.17.0.20
|
|
conditions:
|
|
ready: true
|
|
targetRef:
|
|
kind: ExternalWorkload
|
|
name: name-1-3
|
|
namespace: ns
|
|
topology:
|
|
kubernetes.io/hostname: node-2
|
|
- addresses:
|
|
- 172.17.0.21
|
|
conditions:
|
|
ready: true
|
|
topology:
|
|
kubernetes.io/hostname: node-2
|
|
kind: EndpointSlice
|
|
metadata:
|
|
labels:
|
|
kubernetes.io/service-name: name-1
|
|
name: name-1-bhnqh
|
|
namespace: ns
|
|
ports:
|
|
- name: ""
|
|
port: 8989`,
|
|
`
|
|
apiVersion: workload.linkerd.io/v1beta1
|
|
kind: ExternalWorkload
|
|
metadata:
|
|
name: name-1-1
|
|
namespace: ns
|
|
status:
|
|
conditions:
|
|
ready: true`,
|
|
`
|
|
apiVersion: workload.linkerd.io/v1beta1
|
|
kind: ExternalWorkload
|
|
metadata:
|
|
name: name-1-2
|
|
namespace: ns
|
|
status:
|
|
conditions:
|
|
ready: true`,
|
|
`
|
|
apiVersion: workload.linkerd.io/v1beta1
|
|
kind: ExternalWorkload
|
|
metadata:
|
|
name: name-1-3
|
|
namespace: ns
|
|
status:
|
|
conditions:
|
|
ready: true`,
|
|
},
|
|
id: ExternalWorkloadID{Name: "name-1", Namespace: "ns"},
|
|
port: 8989,
|
|
expectedAddresses: []string{
|
|
"172.17.0.12:8989",
|
|
"172.17.0.19:8989",
|
|
"172.17.0.20:8989",
|
|
"172.17.0.21:8989",
|
|
},
|
|
expectedNoEndpoints: false,
|
|
expectedNoEndpointsServiceExists: false,
|
|
expectedError: false,
|
|
expectedLocalTrafficPolicy: true,
|
|
},
|
|
{
|
|
serviceType: "local services with missing addresses and EndpointSlice",
|
|
k8sConfigs: []string{`
|
|
kind: APIResourceList
|
|
apiVersion: v1
|
|
groupVersion: discovery.k8s.io/v1
|
|
resources:
|
|
- name: endpointslices
|
|
singularName: endpointslice
|
|
namespaced: true
|
|
kind: EndpointSlice
|
|
verbs:
|
|
- delete
|
|
- deletecollection
|
|
- get
|
|
- list
|
|
- patch
|
|
- create
|
|
- update
|
|
- watch
|
|
`, `
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: name-1
|
|
namespace: ns
|
|
spec:
|
|
type: LoadBalancer
|
|
ports:
|
|
- port: 8989`, `
|
|
addressType: IPv4
|
|
apiVersion: discovery.k8s.io/v1
|
|
endpoints:
|
|
- addresses:
|
|
- 172.17.0.23
|
|
conditions:
|
|
ready: true
|
|
targetRef:
|
|
kind: ExternalWorkload
|
|
name: name-1-1
|
|
namespace: ns
|
|
topology:
|
|
kubernetes.io/hostname: node-1
|
|
- addresses:
|
|
- 172.17.0.24
|
|
conditions:
|
|
ready: true
|
|
targetRef:
|
|
kind: ExternalWorkload
|
|
name: name-1-2
|
|
namespace: ns
|
|
topology:
|
|
kubernetes.io/hostname: node-1
|
|
- addresses:
|
|
- 172.17.0.25
|
|
conditions:
|
|
ready: true
|
|
targetRef:
|
|
kind: ExternalWorkload
|
|
name: name-1-3
|
|
namespace: ns
|
|
topology:
|
|
kubernetes.io/hostname: node-2
|
|
kind: EndpointSlice
|
|
metadata:
|
|
labels:
|
|
kubernetes.io/service-name: name-1
|
|
name: name1-f5fad
|
|
namespace: ns
|
|
ports:
|
|
- name: ""
|
|
port: 8989`, `
|
|
apiVersion: workload.linkerd.io/v1beta1
|
|
kind: ExternalWorkload
|
|
metadata:
|
|
name: name-1-3
|
|
namespace: ns
|
|
status:
|
|
conditions:
|
|
ready: true`,
|
|
},
|
|
id: ServiceID{Name: "name-1", Namespace: "ns"},
|
|
port: 8989,
|
|
expectedAddresses: []string{"172.17.0.25:8989"},
|
|
expectedNoEndpoints: false,
|
|
expectedNoEndpointsServiceExists: false,
|
|
expectedError: false,
|
|
},
|
|
{
|
|
serviceType: "service with EndpointSlice without labels",
|
|
k8sConfigs: []string{`
|
|
kind: APIResourceList
|
|
apiVersion: v1
|
|
groupVersion: discovery.k8s.io/v1
|
|
resources:
|
|
- name: endpointslices
|
|
singularName: endpointslice
|
|
namespaced: true
|
|
kind: EndpointSlice
|
|
verbs:
|
|
- delete
|
|
- deletecollection
|
|
- get
|
|
- list
|
|
- patch
|
|
- create
|
|
- update
|
|
- watch
|
|
`, `
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: name-5
|
|
namespace: ns
|
|
spec:
|
|
type: LoadBalancer
|
|
ports:
|
|
- port: 8989`, `
|
|
addressType: IPv4
|
|
apiVersion: discovery.k8s.io/v1
|
|
endpoints:
|
|
- addresses:
|
|
- 172.17.0.12
|
|
conditions:
|
|
ready: true
|
|
hostname: name-1-1
|
|
targetRef:
|
|
kind: ExternalWorkload
|
|
name: name-1-1
|
|
namespace: ns
|
|
topology:
|
|
kubernetes.io/hostname: node-1
|
|
kind: EndpointSlice
|
|
metadata:
|
|
labels:
|
|
name: name-1-f5fad
|
|
namespace: ns
|
|
ports:
|
|
- name: ""
|
|
port: 8989`, `
|
|
apiVersion: workload.linkerd.io/v1beta1
|
|
kind: ExternalWorkload
|
|
metadata:
|
|
name: name-1-1
|
|
namespace: ns
|
|
status:
|
|
conditions:
|
|
ready: true`,
|
|
},
|
|
id: ServiceID{Name: "name-5", Namespace: "ns"},
|
|
port: 8989,
|
|
expectedAddresses: []string{},
|
|
expectedNoEndpoints: true,
|
|
expectedNoEndpointsServiceExists: true,
|
|
expectedError: false,
|
|
},
|
|
|
|
{
|
|
serviceType: "service with IPv6 address type EndpointSlice",
|
|
k8sConfigs: []string{`
|
|
kind: APIResourceList
|
|
apiVersion: v1
|
|
groupVersion: discovery.k8s.io/v1
|
|
resources:
|
|
- name: endpointslices
|
|
singularName: endpointslice
|
|
namespaced: true
|
|
kind: EndpointSlice
|
|
verbs:
|
|
- delete
|
|
- deletecollection
|
|
- get
|
|
- list
|
|
- patch
|
|
- create
|
|
- update
|
|
- watch
|
|
`, `
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: name-5
|
|
namespace: ns
|
|
spec:
|
|
type: LoadBalancer
|
|
ports:
|
|
- port: 9000`, `
|
|
addressType: IPv6
|
|
apiVersion: discovery.k8s.io/v1
|
|
endpoints:
|
|
- addresses:
|
|
- 0:0:0:0:0:0:0:1
|
|
conditions:
|
|
ready: true
|
|
targetRef:
|
|
kind: ExternalWorkload
|
|
name: name-5-1
|
|
namespace: ns
|
|
topology:
|
|
kubernetes.io/hostname: node-1
|
|
kind: EndpointSlice
|
|
metadata:
|
|
labels:
|
|
name: name-5-f65dv
|
|
namespace: ns
|
|
ownerReferences:
|
|
- apiVersion: v1
|
|
kind: Service
|
|
name: name-5
|
|
ports:
|
|
- name: ""
|
|
port: 9000`, `
|
|
apiVersion: workload.linkerd.io/v1beta1
|
|
kind: ExternalWorkload
|
|
metadata:
|
|
name: name-5-1
|
|
namespace: ns
|
|
status:
|
|
conditions:
|
|
ready: true`,
|
|
},
|
|
id: ServiceID{Name: "name-5", Namespace: "ns"},
|
|
port: 9000,
|
|
expectedAddresses: []string{},
|
|
expectedNoEndpoints: true,
|
|
expectedNoEndpointsServiceExists: true,
|
|
expectedError: false,
|
|
},
|
|
} {
|
|
tt := tt // pin
|
|
t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) {
|
|
k8sAPI, err := k8s.NewFakeAPI(tt.k8sConfigs...)
|
|
if err != nil {
|
|
t.Fatalf("NewFakeAPI returned an error: %s", err)
|
|
}
|
|
|
|
metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
|
|
if err != nil {
|
|
t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
|
|
}
|
|
|
|
watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local")
|
|
if err != nil {
|
|
t.Fatalf("can't create Endpoints watcher: %s", err)
|
|
}
|
|
|
|
k8sAPI.Sync(nil)
|
|
metadataAPI.Sync(nil)
|
|
|
|
listener := newBufferingEndpointListener()
|
|
|
|
err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener)
|
|
if tt.expectedError && err == nil {
|
|
t.Fatal("Expected error but was ok")
|
|
}
|
|
if !tt.expectedError && err != nil {
|
|
t.Fatalf("Expected no error, got [%s]", err)
|
|
}
|
|
|
|
if listener.localTrafficPolicy != tt.expectedLocalTrafficPolicy {
|
|
t.Fatalf("Expected localTrafficPolicy [%v], got [%v]", tt.expectedLocalTrafficPolicy, listener.localTrafficPolicy)
|
|
}
|
|
|
|
listener.ExpectAdded(tt.expectedAddresses, t)
|
|
|
|
if listener.endpointsAreNotCalled() != tt.expectedNoEndpoints {
|
|
t.Fatalf("Expected noEndpointsCalled to be [%t], got [%t]",
|
|
tt.expectedNoEndpoints, listener.endpointsAreNotCalled())
|
|
}
|
|
|
|
if listener.endpointsDoNotExist() != tt.expectedNoEndpointsServiceExists {
|
|
t.Fatalf("Expected noEndpointsExist to be [%t], got [%t]",
|
|
tt.expectedNoEndpointsServiceExists, listener.endpointsDoNotExist())
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestEndpointsWatcherDeletion(t *testing.T) {
|
|
k8sConfigs := []string{`
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: name1
|
|
namespace: ns
|
|
spec:
|
|
type: LoadBalancer
|
|
ports:
|
|
- port: 8989`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Endpoints
|
|
metadata:
|
|
name: name1
|
|
namespace: ns
|
|
subsets:
|
|
- addresses:
|
|
- ip: 172.17.0.12
|
|
targetRef:
|
|
kind: Pod
|
|
name: name1-1
|
|
namespace: ns
|
|
ports:
|
|
- port: 8989`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: name1-1
|
|
namespace: ns
|
|
status:
|
|
phase: Running
|
|
podIP: 172.17.0.12`}
|
|
|
|
for _, tt := range []struct {
|
|
serviceType string
|
|
k8sConfigs []string
|
|
id ServiceID
|
|
hostname string
|
|
port Port
|
|
objectToDelete interface{}
|
|
deletingServices bool
|
|
}{
|
|
{
|
|
serviceType: "can delete endpoints",
|
|
k8sConfigs: k8sConfigs,
|
|
id: ServiceID{Name: "name1", Namespace: "ns"},
|
|
port: 8989,
|
|
hostname: "name1-1",
|
|
objectToDelete: &corev1.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: "name1", Namespace: "ns"}},
|
|
},
|
|
{
|
|
serviceType: "can delete endpoints when wrapped in a DeletedFinalStateUnknown",
|
|
k8sConfigs: k8sConfigs,
|
|
id: ServiceID{Name: "name1", Namespace: "ns"},
|
|
port: 8989,
|
|
hostname: "name1-1",
|
|
objectToDelete: &corev1.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: "name1", Namespace: "ns"}},
|
|
},
|
|
{
|
|
serviceType: "can delete services",
|
|
k8sConfigs: k8sConfigs,
|
|
id: ServiceID{Name: "name1", Namespace: "ns"},
|
|
port: 8989,
|
|
hostname: "name1-1",
|
|
objectToDelete: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Name: "name1", Namespace: "ns"}},
|
|
deletingServices: true,
|
|
},
|
|
{
|
|
serviceType: "can delete services when wrapped in a DeletedFinalStateUnknown",
|
|
k8sConfigs: k8sConfigs,
|
|
id: ServiceID{Name: "name1", Namespace: "ns"},
|
|
port: 8989,
|
|
hostname: "name1-1",
|
|
objectToDelete: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Name: "name1", Namespace: "ns"}},
|
|
deletingServices: true,
|
|
},
|
|
} {
|
|
|
|
tt := tt // pin
|
|
t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) {
|
|
k8sAPI, err := k8s.NewFakeAPI(tt.k8sConfigs...)
|
|
if err != nil {
|
|
t.Fatalf("NewFakeAPI returned an error: %s", err)
|
|
}
|
|
|
|
metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
|
|
if err != nil {
|
|
t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
|
|
}
|
|
|
|
watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), false, "local")
|
|
if err != nil {
|
|
t.Fatalf("can't create Endpoints watcher: %s", err)
|
|
}
|
|
|
|
k8sAPI.Sync(nil)
|
|
metadataAPI.Sync(nil)
|
|
|
|
listener := newBufferingEndpointListener()
|
|
|
|
err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
if tt.deletingServices {
|
|
watcher.deleteService(tt.objectToDelete)
|
|
} else {
|
|
watcher.deleteEndpoints(tt.objectToDelete)
|
|
}
|
|
|
|
if !listener.endpointsAreNotCalled() {
|
|
t.Fatal("Expected NoEndpoints to be Called")
|
|
}
|
|
})
|
|
|
|
}
|
|
}
|
|
|
|
func TestEndpointsWatcherDeletionWithEndpointSlices(t *testing.T) {
|
|
k8sConfigsWithES := []string{`
|
|
kind: APIResourceList
|
|
apiVersion: v1
|
|
groupVersion: discovery.k8s.io/v1
|
|
resources:
|
|
- name: endpointslices
|
|
singularName: endpointslice
|
|
namespaced: true
|
|
kind: EndpointSlice
|
|
verbs:
|
|
- delete
|
|
- deletecollection
|
|
- get
|
|
- list
|
|
- patch
|
|
- create
|
|
- update
|
|
- watch
|
|
`, `
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: name1
|
|
namespace: ns
|
|
spec:
|
|
type: LoadBalancer
|
|
ports:
|
|
- port: 8989`, `
|
|
addressType: IPv4
|
|
apiVersion: discovery.k8s.io/v1
|
|
endpoints:
|
|
- addresses:
|
|
- 172.17.0.12
|
|
conditions:
|
|
ready: true
|
|
targetRef:
|
|
kind: Pod
|
|
name: name1-1
|
|
namespace: ns
|
|
topology:
|
|
kubernetes.io/hostname: node-1
|
|
kind: EndpointSlice
|
|
metadata:
|
|
labels:
|
|
kubernetes.io/service-name: name1
|
|
name: name1-del
|
|
namespace: ns
|
|
ports:
|
|
- name: ""
|
|
port: 8989`, `
|
|
apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: name1-1
|
|
namespace: ns
|
|
status:
|
|
phase: Running
|
|
podIP: 172.17.0.12`}
|
|
|
|
k8sConfigWithMultipleES := append(k8sConfigsWithES, []string{`
|
|
addressType: IPv4
|
|
apiVersion: discovery.k8s.io/v1
|
|
endpoints:
|
|
- addresses:
|
|
- 172.17.0.13
|
|
conditions:
|
|
ready: true
|
|
targetRef:
|
|
kind: Pod
|
|
name: name1-2
|
|
namespace: ns
|
|
topology:
|
|
kubernetes.io/hostname: node-1
|
|
kind: EndpointSlice
|
|
metadata:
|
|
labels:
|
|
kubernetes.io/service-name: name1
|
|
name: name1-live
|
|
namespace: ns
|
|
ports:
|
|
- name: ""
|
|
port: 8989`, `apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: name1-2
|
|
namespace: ns
|
|
status:
|
|
phase: Running
|
|
podIP: 172.17.0.13`}...)
|
|
|
|
for _, tt := range []struct {
|
|
serviceType string
|
|
k8sConfigs []string
|
|
id ServiceID
|
|
hostname string
|
|
port Port
|
|
objectToDelete interface{}
|
|
deletingServices bool
|
|
hasSliceAccess bool
|
|
noEndpointsCalled bool
|
|
}{
|
|
{
|
|
serviceType: "can delete an EndpointSlice",
|
|
k8sConfigs: k8sConfigsWithES,
|
|
id: ServiceID{Name: "name1", Namespace: "ns"},
|
|
port: 8989,
|
|
hostname: "name1-1",
|
|
objectToDelete: createTestEndpointSlice(consts.PodKind),
|
|
hasSliceAccess: true,
|
|
noEndpointsCalled: true,
|
|
},
|
|
{
|
|
serviceType: "can delete an EndpointSlice when wrapped in a DeletedFinalStateUnknown",
|
|
k8sConfigs: k8sConfigsWithES,
|
|
id: ServiceID{Name: "name1", Namespace: "ns"},
|
|
port: 8989,
|
|
hostname: "name1-1",
|
|
objectToDelete: createTestEndpointSlice(consts.PodKind),
|
|
hasSliceAccess: true,
|
|
noEndpointsCalled: true,
|
|
},
|
|
{
|
|
serviceType: "can delete an EndpointSlice when there are multiple ones",
|
|
k8sConfigs: k8sConfigWithMultipleES,
|
|
id: ServiceID{Name: "name1", Namespace: "ns"},
|
|
port: 8989,
|
|
hostname: "name1-1",
|
|
objectToDelete: createTestEndpointSlice(consts.PodKind),
|
|
hasSliceAccess: true,
|
|
noEndpointsCalled: false,
|
|
},
|
|
} {
|
|
tt := tt // pin
|
|
t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) {
|
|
k8sAPI, err := k8s.NewFakeAPI(tt.k8sConfigs...)
|
|
if err != nil {
|
|
t.Fatalf("NewFakeAPI returned an error: %s", err)
|
|
}
|
|
|
|
metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
|
|
if err != nil {
|
|
t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
|
|
}
|
|
|
|
watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local")
|
|
if err != nil {
|
|
t.Fatalf("can't create Endpoints watcher: %s", err)
|
|
}
|
|
|
|
k8sAPI.Sync(nil)
|
|
metadataAPI.Sync(nil)
|
|
|
|
listener := newBufferingEndpointListener()
|
|
|
|
err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
watcher.deleteEndpointSlice(tt.objectToDelete)
|
|
|
|
if listener.endpointsAreNotCalled() != tt.noEndpointsCalled {
|
|
t.Fatalf("Expected noEndpointsCalled to be [%t], got [%t]",
|
|
tt.noEndpointsCalled, listener.endpointsAreNotCalled())
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestEndpointsWatcherDeletionWithEndpointSlicesExternalWorkload(t *testing.T) {
|
|
k8sConfigsWithES := []string{`
|
|
kind: APIResourceList
|
|
apiVersion: v1
|
|
groupVersion: discovery.k8s.io/v1
|
|
resources:
|
|
- name: endpointslices
|
|
singularName: endpointslice
|
|
namespaced: true
|
|
kind: EndpointSlice
|
|
verbs:
|
|
- delete
|
|
- deletecollection
|
|
- get
|
|
- list
|
|
- patch
|
|
- create
|
|
- update
|
|
- watch
|
|
`, `
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: name1
|
|
namespace: ns
|
|
spec:
|
|
type: LoadBalancer
|
|
ports:
|
|
- port: 8989`, `
|
|
addressType: IPv4
|
|
apiVersion: discovery.k8s.io/v1
|
|
endpoints:
|
|
- addresses:
|
|
- 172.17.0.12
|
|
conditions:
|
|
ready: true
|
|
targetRef:
|
|
kind: ExternalWorkload
|
|
name: name1-1
|
|
namespace: ns
|
|
topology:
|
|
kubernetes.io/hostname: node-1
|
|
kind: EndpointSlice
|
|
metadata:
|
|
labels:
|
|
kubernetes.io/service-name: name1
|
|
name: name1-del
|
|
namespace: ns
|
|
ports:
|
|
- name: ""
|
|
port: 8989`, `
|
|
apiVersion: workload.linkerd.io/v1beta1
|
|
kind: ExternalWorkload
|
|
metadata:
|
|
name: name1-1
|
|
namespace: ns
|
|
status:
|
|
conditions:
|
|
ready: true`}
|
|
|
|
k8sConfigWithMultipleES := append(k8sConfigsWithES, []string{`
|
|
addressType: IPv4
|
|
apiVersion: discovery.k8s.io/v1
|
|
endpoints:
|
|
- addresses:
|
|
- 172.17.0.13
|
|
conditions:
|
|
ready: true
|
|
targetRef:
|
|
kind: ExternalWorkload
|
|
name: name1-2
|
|
namespace: ns
|
|
topology:
|
|
kubernetes.io/hostname: node-1
|
|
kind: EndpointSlice
|
|
metadata:
|
|
labels:
|
|
kubernetes.io/service-name: name1
|
|
name: name1-live
|
|
namespace: ns
|
|
ports:
|
|
- name: ""
|
|
port: 8989`, `apiVersion: workload.linkerd.io/v1beta1
|
|
kind: ExternalWorkload
|
|
metadata:
|
|
name: name1-2
|
|
namespace: ns
|
|
status:
|
|
conditions:
|
|
ready: true`}...)
|
|
|
|
for _, tt := range []struct {
|
|
serviceType string
|
|
k8sConfigs []string
|
|
id ServiceID
|
|
hostname string
|
|
port Port
|
|
objectToDelete interface{}
|
|
deletingServices bool
|
|
hasSliceAccess bool
|
|
noEndpointsCalled bool
|
|
}{
|
|
{
|
|
serviceType: "can delete an EndpointSlice",
|
|
k8sConfigs: k8sConfigsWithES,
|
|
id: ServiceID{Name: "name1", Namespace: "ns"},
|
|
port: 8989,
|
|
hostname: "name1-1",
|
|
objectToDelete: createTestEndpointSlice(consts.ExtWorkloadKind),
|
|
hasSliceAccess: true,
|
|
noEndpointsCalled: true,
|
|
},
|
|
{
|
|
serviceType: "can delete an EndpointSlice when wrapped in a DeletedFinalStateUnknown",
|
|
k8sConfigs: k8sConfigsWithES,
|
|
id: ServiceID{Name: "name1", Namespace: "ns"},
|
|
port: 8989,
|
|
hostname: "name1-1",
|
|
objectToDelete: createTestEndpointSlice(consts.ExtWorkloadKind),
|
|
hasSliceAccess: true,
|
|
noEndpointsCalled: true,
|
|
},
|
|
{
|
|
serviceType: "can delete an EndpointSlice when there are multiple ones",
|
|
k8sConfigs: k8sConfigWithMultipleES,
|
|
id: ServiceID{Name: "name1", Namespace: "ns"},
|
|
port: 8989,
|
|
hostname: "name1-1",
|
|
objectToDelete: createTestEndpointSlice(consts.ExtWorkloadKind),
|
|
hasSliceAccess: true,
|
|
noEndpointsCalled: false,
|
|
},
|
|
} {
|
|
tt := tt // pin
|
|
t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) {
|
|
k8sAPI, err := k8s.NewFakeAPI(tt.k8sConfigs...)
|
|
if err != nil {
|
|
t.Fatalf("NewFakeAPI returned an error: %s", err)
|
|
}
|
|
|
|
metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
|
|
if err != nil {
|
|
t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
|
|
}
|
|
|
|
watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local")
|
|
if err != nil {
|
|
t.Fatalf("can't create Endpoints watcher: %s", err)
|
|
}
|
|
|
|
k8sAPI.Sync(nil)
|
|
metadataAPI.Sync(nil)
|
|
|
|
listener := newBufferingEndpointListener()
|
|
|
|
err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
watcher.deleteEndpointSlice(tt.objectToDelete)
|
|
|
|
if listener.endpointsAreNotCalled() != tt.noEndpointsCalled {
|
|
t.Fatalf("Expected noEndpointsCalled to be [%t], got [%t]",
|
|
tt.noEndpointsCalled, listener.endpointsAreNotCalled())
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestEndpointsWatcherServiceMirrors(t *testing.T) {
|
|
for _, tt := range []struct {
|
|
serviceType string
|
|
k8sConfigs []string
|
|
id ServiceID
|
|
hostname string
|
|
port Port
|
|
expectedAddresses []string
|
|
expectedNoEndpoints bool
|
|
expectedNoEndpointsServiceExists bool
|
|
enableEndpointSlices bool
|
|
}{
|
|
{
|
|
k8sConfigs: []string{`
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: name1-remote
|
|
namespace: ns
|
|
spec:
|
|
type: LoadBalancer
|
|
ports:
|
|
- port: 8989`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Endpoints
|
|
metadata:
|
|
name: name1-remote
|
|
namespace: ns
|
|
annotations:
|
|
mirror.linkerd.io/remote-gateway-identity: "gateway-identity-1"
|
|
mirror.linkerd.io/remote-svc-fq-name: "name1-remote-fq"
|
|
labels:
|
|
mirror.linkerd.io/mirrored-service: "true"
|
|
subsets:
|
|
- addresses:
|
|
- ip: 172.17.0.12
|
|
ports:
|
|
- port: 8989`,
|
|
},
|
|
serviceType: "mirrored service with identity",
|
|
id: ServiceID{Name: "name1-remote", Namespace: "ns"},
|
|
port: 8989,
|
|
expectedAddresses: []string{
|
|
"172.17.0.12:8989/gateway-identity-1/name1-remote-fq:8989",
|
|
},
|
|
expectedNoEndpoints: false,
|
|
expectedNoEndpointsServiceExists: false,
|
|
},
|
|
{
|
|
k8sConfigs: []string{`
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: name1-remote
|
|
namespace: ns
|
|
spec:
|
|
type: LoadBalancer
|
|
ports:
|
|
- port: 8989`,
|
|
`
|
|
apiVersion: discovery.k8s.io/v1
|
|
kind: EndpointSlice
|
|
metadata:
|
|
name: name1-remote-xxxx
|
|
namespace: ns
|
|
annotations:
|
|
mirror.linkerd.io/remote-gateway-identity: "gateway-identity-1"
|
|
mirror.linkerd.io/remote-svc-fq-name: "name1-remote-fq"
|
|
labels:
|
|
mirror.linkerd.io/mirrored-service: "true"
|
|
kubernetes.io/service-name: name1-remote
|
|
endpoints:
|
|
- addresses:
|
|
- 172.17.0.12
|
|
ports:
|
|
- port: 8989`,
|
|
},
|
|
serviceType: "mirrored service with identity and endpoint slices",
|
|
id: ServiceID{Name: "name1-remote", Namespace: "ns"},
|
|
port: 8989,
|
|
expectedAddresses: []string{
|
|
"172.17.0.12:8989/gateway-identity-1/name1-remote-fq:8989",
|
|
},
|
|
expectedNoEndpoints: false,
|
|
expectedNoEndpointsServiceExists: false,
|
|
enableEndpointSlices: true,
|
|
},
|
|
{
|
|
k8sConfigs: []string{`
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: name1-remote
|
|
namespace: ns
|
|
spec:
|
|
type: LoadBalancer
|
|
ports:
|
|
- port: 8989`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Endpoints
|
|
metadata:
|
|
name: name1-remote
|
|
namespace: ns
|
|
annotations:
|
|
mirror.linkerd.io/remote-svc-fq-name: "name1-remote-fq"
|
|
labels:
|
|
mirror.linkerd.io/mirrored-service: "true"
|
|
subsets:
|
|
- addresses:
|
|
- ip: 172.17.0.12
|
|
ports:
|
|
- port: 8989`,
|
|
},
|
|
serviceType: "mirrored service without identity",
|
|
id: ServiceID{Name: "name1-remote", Namespace: "ns"},
|
|
port: 8989,
|
|
expectedAddresses: []string{
|
|
"172.17.0.12:8989/name1-remote-fq:8989",
|
|
},
|
|
expectedNoEndpoints: false,
|
|
expectedNoEndpointsServiceExists: false,
|
|
},
|
|
|
|
{
|
|
k8sConfigs: []string{`
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: name1-remote
|
|
namespace: ns
|
|
spec:
|
|
type: LoadBalancer
|
|
ports:
|
|
- port: 8989`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Endpoints
|
|
metadata:
|
|
name: name1-remote
|
|
namespace: ns
|
|
annotations:
|
|
mirror.linkerd.io/remote-gateway-identity: "gateway-identity-1"
|
|
mirror.linkerd.io/remote-svc-fq-name: "name1-remote-fq"
|
|
labels:
|
|
mirror.linkerd.io/mirrored-service: "true"
|
|
subsets:
|
|
- addresses:
|
|
- ip: 172.17.0.12
|
|
ports:
|
|
- port: 9999`,
|
|
},
|
|
serviceType: "mirrored service with remapped port in endpoints",
|
|
id: ServiceID{Name: "name1-remote", Namespace: "ns"},
|
|
port: 8989,
|
|
expectedAddresses: []string{
|
|
"172.17.0.12:9999/gateway-identity-1/name1-remote-fq:8989",
|
|
},
|
|
expectedNoEndpoints: false,
|
|
expectedNoEndpointsServiceExists: false,
|
|
},
|
|
{
|
|
k8sConfigs: []string{`
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: name1-remote
|
|
namespace: ns
|
|
spec:
|
|
type: LoadBalancer
|
|
ports:
|
|
- port: 8989`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Endpoints
|
|
metadata:
|
|
name: name1-remote
|
|
namespace: ns
|
|
annotations:
|
|
mirror.linkerd.io/remote-gateway-identity: ""
|
|
mirror.linkerd.io/remote-svc-fq-name: "name1-remote-fq"
|
|
labels:
|
|
mirror.linkerd.io/mirrored-service: "true"
|
|
subsets:
|
|
- addresses:
|
|
- ip: 172.17.0.12
|
|
ports:
|
|
- port: 9999`,
|
|
},
|
|
serviceType: "mirrored service with empty identity and remapped port in endpoints",
|
|
id: ServiceID{Name: "name1-remote", Namespace: "ns"},
|
|
port: 8989,
|
|
expectedAddresses: []string{
|
|
"172.17.0.12:9999/name1-remote-fq:8989",
|
|
},
|
|
expectedNoEndpoints: false,
|
|
expectedNoEndpointsServiceExists: false,
|
|
},
|
|
} {
|
|
tt := tt // pin
|
|
t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) {
|
|
k8sAPI, err := k8s.NewFakeAPI(tt.k8sConfigs...)
|
|
if err != nil {
|
|
t.Fatalf("NewFakeAPI returned an error: %s", err)
|
|
}
|
|
|
|
metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
|
|
if err != nil {
|
|
t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
|
|
}
|
|
|
|
watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), tt.enableEndpointSlices, "local")
|
|
if err != nil {
|
|
t.Fatalf("can't create Endpoints watcher: %s", err)
|
|
}
|
|
|
|
k8sAPI.Sync(nil)
|
|
metadataAPI.Sync(nil)
|
|
|
|
listener := newBufferingEndpointListener()
|
|
|
|
err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener)
|
|
|
|
if err != nil {
|
|
t.Fatalf("NewFakeAPI returned an error: %s", err)
|
|
}
|
|
|
|
listener.ExpectAdded(tt.expectedAddresses, t)
|
|
|
|
if listener.endpointsAreNotCalled() != tt.expectedNoEndpoints {
|
|
t.Fatalf("Expected noEndpointsCalled to be [%t], got [%t]",
|
|
tt.expectedNoEndpoints, listener.endpointsAreNotCalled())
|
|
}
|
|
|
|
if listener.endpointsDoNotExist() != tt.expectedNoEndpointsServiceExists {
|
|
t.Fatalf("Expected noEndpointsExist to be [%t], got [%t]",
|
|
tt.expectedNoEndpointsServiceExists, listener.endpointsDoNotExist())
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func testPod(resVersion string) *corev1.Pod {
|
|
return &corev1.Pod{
|
|
TypeMeta: metav1.TypeMeta{
|
|
Kind: "Pod",
|
|
APIVersion: "v1",
|
|
},
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
ResourceVersion: resVersion,
|
|
Name: "name1-1",
|
|
Namespace: "ns",
|
|
},
|
|
Status: corev1.PodStatus{
|
|
Phase: corev1.PodRunning,
|
|
PodIP: "172.17.0.12",
|
|
},
|
|
}
|
|
}
|
|
|
|
func endpoints(identity string) *corev1.Endpoints {
|
|
return &corev1.Endpoints{
|
|
TypeMeta: metav1.TypeMeta{
|
|
Kind: "Endpoints",
|
|
APIVersion: "v1",
|
|
},
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "remote-service",
|
|
Namespace: "ns",
|
|
Annotations: map[string]string{
|
|
consts.RemoteGatewayIdentity: identity,
|
|
consts.RemoteServiceFqName: "remote-service.svc.default.cluster.local",
|
|
},
|
|
Labels: map[string]string{
|
|
consts.MirroredResourceLabel: "true",
|
|
},
|
|
},
|
|
Subsets: []corev1.EndpointSubset{
|
|
{
|
|
Addresses: []corev1.EndpointAddress{
|
|
{
|
|
IP: "1.2.3.4",
|
|
},
|
|
},
|
|
Ports: []corev1.EndpointPort{
|
|
{
|
|
Port: 80,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
func createTestEndpointSlice(targetRefKind string) *dv1.EndpointSlice {
|
|
return &dv1.EndpointSlice{
|
|
AddressType: "IPv4",
|
|
ObjectMeta: metav1.ObjectMeta{Name: "name1-del", Namespace: "ns", Labels: map[string]string{dv1.LabelServiceName: "name1"}},
|
|
Endpoints: []dv1.Endpoint{
|
|
{
|
|
Addresses: []string{"172.17.0.12"},
|
|
Conditions: dv1.EndpointConditions{Ready: func(b bool) *bool { return &b }(true)},
|
|
TargetRef: &corev1.ObjectReference{Name: "name1-1", Namespace: "ns", Kind: targetRefKind},
|
|
},
|
|
},
|
|
Ports: []dv1.EndpointPort{
|
|
{
|
|
Name: func(s string) *string { return &s }(""),
|
|
Port: func(i int32) *int32 { return &i }(8989),
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
func TestEndpointsChangeDetection(t *testing.T) {
|
|
|
|
k8sConfigs := []string{`
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: remote-service
|
|
namespace: ns
|
|
spec:
|
|
ports:
|
|
- port: 80
|
|
targetPort: 80`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Endpoints
|
|
metadata:
|
|
name: remote-service
|
|
namespace: ns
|
|
annotations:
|
|
mirror.linkerd.io/remote-gateway-identity: "gateway-identity-1"
|
|
mirror.linkerd.io/remote-svc-fq-name: "remote-service.svc.default.cluster.local"
|
|
labels:
|
|
mirror.linkerd.io/mirrored-service: "true"
|
|
subsets:
|
|
- addresses:
|
|
- ip: 1.2.3.4
|
|
ports:
|
|
- port: 80`,
|
|
}
|
|
|
|
for _, tt := range []struct {
|
|
serviceType string
|
|
id ServiceID
|
|
port Port
|
|
newEndpoints *corev1.Endpoints
|
|
expectedAddresses []string
|
|
}{
|
|
{
|
|
serviceType: "will update endpoints if identity is different",
|
|
id: ServiceID{Name: "remote-service", Namespace: "ns"},
|
|
port: 80,
|
|
newEndpoints: endpoints("gateway-identity-2"),
|
|
expectedAddresses: []string{"1.2.3.4:80/gateway-identity-1/remote-service.svc.default.cluster.local:80", "1.2.3.4:80/gateway-identity-2/remote-service.svc.default.cluster.local:80"},
|
|
},
|
|
|
|
{
|
|
serviceType: "will not update endpoints if identity is the same",
|
|
id: ServiceID{Name: "remote-service", Namespace: "ns"},
|
|
port: 80,
|
|
newEndpoints: endpoints("gateway-identity-1"),
|
|
expectedAddresses: []string{"1.2.3.4:80/gateway-identity-1/remote-service.svc.default.cluster.local:80"},
|
|
},
|
|
} {
|
|
|
|
tt := tt // pin
|
|
t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) {
|
|
k8sAPI, err := k8s.NewFakeAPI(k8sConfigs...)
|
|
if err != nil {
|
|
t.Fatalf("NewFakeAPI returned an error: %s", err)
|
|
}
|
|
|
|
metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
|
|
if err != nil {
|
|
t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
|
|
}
|
|
|
|
watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), false, "local")
|
|
if err != nil {
|
|
t.Fatalf("can't create Endpoints watcher: %s", err)
|
|
}
|
|
|
|
k8sAPI.Sync(nil)
|
|
metadataAPI.Sync(nil)
|
|
|
|
listener := newBufferingEndpointListener()
|
|
|
|
err = watcher.Subscribe(tt.id, tt.port, "", listener)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
k8sAPI.Sync(nil)
|
|
|
|
watcher.addEndpoints(tt.newEndpoints)
|
|
|
|
listener.ExpectAdded(tt.expectedAddresses, t)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestPodChangeDetection(t *testing.T) {
|
|
endpoints := &corev1.Endpoints{
|
|
TypeMeta: metav1.TypeMeta{
|
|
Kind: "Endpoints",
|
|
APIVersion: "v1",
|
|
},
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "name1",
|
|
Namespace: "ns",
|
|
},
|
|
Subsets: []corev1.EndpointSubset{
|
|
{
|
|
Addresses: []corev1.EndpointAddress{
|
|
{
|
|
IP: "172.17.0.12",
|
|
Hostname: "name1-1",
|
|
TargetRef: &corev1.ObjectReference{
|
|
Kind: "Pod",
|
|
Namespace: "ns",
|
|
Name: "name1-1",
|
|
},
|
|
},
|
|
},
|
|
Ports: []corev1.EndpointPort{
|
|
{
|
|
Port: 8989,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
k8sConfigs := []string{`
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: name1
|
|
namespace: ns
|
|
spec:
|
|
type: LoadBalancer
|
|
ports:
|
|
- port: 8989`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Endpoints
|
|
metadata:
|
|
name: name1
|
|
namespace: ns
|
|
subsets:
|
|
- addresses:
|
|
- ip: 172.17.0.12
|
|
hostname: name1-1
|
|
targetRef:
|
|
kind: Pod
|
|
name: name1-1
|
|
namespace: ns
|
|
ports:
|
|
- port: 8989`,
|
|
`
|
|
apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: name1-1
|
|
namespace: ns
|
|
resourceVersion: "1"
|
|
status:
|
|
phase: Running
|
|
podIP: 172.17.0.12`}
|
|
|
|
for _, tt := range []struct {
|
|
serviceType string
|
|
id ServiceID
|
|
hostname string
|
|
port Port
|
|
newPod *corev1.Pod
|
|
expectedAddresses []string
|
|
}{
|
|
{
|
|
serviceType: "will update pod if resource version is different",
|
|
id: ServiceID{Name: "name1", Namespace: "ns"},
|
|
port: 8989,
|
|
hostname: "name1-1",
|
|
newPod: testPod("2"),
|
|
|
|
expectedAddresses: []string{"172.17.0.12:8989:1", "172.17.0.12:8989:2"},
|
|
},
|
|
{
|
|
serviceType: "will not update pod if resource version is the same",
|
|
id: ServiceID{Name: "name1", Namespace: "ns"},
|
|
port: 8989,
|
|
hostname: "name1-1",
|
|
newPod: testPod("1"),
|
|
|
|
expectedAddresses: []string{"172.17.0.12:8989:1"},
|
|
},
|
|
} {
|
|
tt := tt // pin
|
|
t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) {
|
|
k8sAPI, err := k8s.NewFakeAPI(k8sConfigs...)
|
|
if err != nil {
|
|
t.Fatalf("NewFakeAPI returned an error: %s", err)
|
|
}
|
|
|
|
metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
|
|
if err != nil {
|
|
t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
|
|
}
|
|
|
|
watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), false, "local")
|
|
if err != nil {
|
|
t.Fatalf("can't create Endpoints watcher: %s", err)
|
|
}
|
|
|
|
k8sAPI.Sync(nil)
|
|
metadataAPI.Sync(nil)
|
|
|
|
listener := newBufferingEndpointListenerWithResVersion()
|
|
|
|
err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
err = k8sAPI.Pod().Informer().GetStore().Add(tt.newPod)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
k8sAPI.Sync(nil)
|
|
|
|
watcher.addEndpoints(endpoints)
|
|
listener.ExpectAdded(tt.expectedAddresses, t)
|
|
})
|
|
}
|
|
}
|
|
|
|
// Test that when an EndpointSlice is scaled down, the EndpointsWatcher sends
|
|
// all of the Remove events, even if the associated pod / workload is no longer available
|
|
// from the API.
|
|
func TestEndpointSliceScaleDown(t *testing.T) {
|
|
k8sConfigsWithES := []string{`
|
|
kind: APIResourceList
|
|
apiVersion: v1
|
|
groupVersion: discovery.k8s.io/v1
|
|
resources:
|
|
- name: endpointslices
|
|
singularName: endpointslice
|
|
namespaced: true
|
|
kind: EndpointSlice
|
|
verbs:
|
|
- delete
|
|
- deletecollection
|
|
- get
|
|
- list
|
|
- patch
|
|
- create
|
|
- update
|
|
- watch
|
|
`, `
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: name1
|
|
namespace: ns
|
|
spec:
|
|
type: LoadBalancer
|
|
ports:
|
|
- port: 8989`, `
|
|
addressType: IPv4
|
|
apiVersion: discovery.k8s.io/v1
|
|
endpoints:
|
|
- addresses:
|
|
- 172.17.0.12
|
|
conditions:
|
|
ready: true
|
|
targetRef:
|
|
kind: Pod
|
|
name: name1-1
|
|
namespace: ns
|
|
topology:
|
|
kubernetes.io/hostname: node-1
|
|
kind: EndpointSlice
|
|
metadata:
|
|
labels:
|
|
kubernetes.io/service-name: name1
|
|
name: name1-es
|
|
namespace: ns
|
|
ports:
|
|
- name: ""
|
|
port: 8989`, `
|
|
apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: name1-1
|
|
namespace: ns
|
|
status:
|
|
phase: Running
|
|
podIP: 172.17.0.12`}
|
|
|
|
// Create an EndpointSlice with one endpoint, backed by a pod.
|
|
|
|
k8sAPI, err := k8s.NewFakeAPI(k8sConfigsWithES...)
|
|
if err != nil {
|
|
t.Fatalf("NewFakeAPI returned an error: %s", err)
|
|
}
|
|
|
|
metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
|
|
if err != nil {
|
|
t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
|
|
}
|
|
|
|
watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local")
|
|
if err != nil {
|
|
t.Fatalf("can't create Endpoints watcher: %s", err)
|
|
}
|
|
|
|
k8sAPI.Sync(nil)
|
|
metadataAPI.Sync(nil)
|
|
|
|
listener := newBufferingEndpointListener()
|
|
|
|
err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, "", listener)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
k8sAPI.Sync(nil)
|
|
|
|
listener.ExpectAdded([]string{"172.17.0.12:8989"}, t)
|
|
|
|
// Delete the backing pod and scale the EndpointSlice to 0 endpoints.
|
|
|
|
err = k8sAPI.Client.CoreV1().Pods("ns").Delete(context.Background(), "name1-1", metav1.DeleteOptions{})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// It may take some time before the pod deletion is recognized by the
|
|
// lister. We wait until the lister sees the pod as deleted.
|
|
err = testutil.RetryFor(time.Second*30, func() error {
|
|
_, err := k8sAPI.Pod().Lister().Pods("ns").Get("name1-1")
|
|
if kerrors.IsNotFound(err) {
|
|
return nil
|
|
}
|
|
if err == nil {
|
|
return errors.New("pod should be deleted, but still exists in lister")
|
|
}
|
|
return err
|
|
})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
ES, err := k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Get(context.Background(), "name1-es", metav1.GetOptions{})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
emptyES := &dv1.EndpointSlice{
|
|
AddressType: "IPv4",
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "name1-es", Namespace: "ns",
|
|
Labels: map[string]string{dv1.LabelServiceName: "name1"},
|
|
},
|
|
Endpoints: []dv1.Endpoint{},
|
|
Ports: []dv1.EndpointPort{},
|
|
}
|
|
|
|
watcher.updateEndpointSlice(ES, emptyES)
|
|
|
|
// Ensure the watcher emits a remove event.
|
|
|
|
listener.ExpectRemoved([]string{"172.17.0.12:8989"}, t)
|
|
}
|
|
|
|
// Test that when an endpointslice's endpoints change their readiness status to
|
|
// not ready, this is correctly picked up by the subscribers
|
|
func TestEndpointSliceChangeNotReady(t *testing.T) {
|
|
k8sConfigsWithES := []string{`
|
|
kind: APIResourceList
|
|
apiVersion: v1
|
|
groupVersion: discovery.k8s.io/v1
|
|
resources:
|
|
- name: endpointslices
|
|
singularName: endpointslice
|
|
namespaced: true
|
|
kind: EndpointSlice
|
|
verbs:
|
|
- delete
|
|
- deletecollection
|
|
- get
|
|
- list
|
|
- patch
|
|
- create
|
|
- update
|
|
- watch
|
|
`, `
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: name1
|
|
namespace: ns
|
|
spec:
|
|
type: LoadBalancer
|
|
ports:
|
|
- port: 8989`, `
|
|
addressType: IPv4
|
|
apiVersion: discovery.k8s.io/v1
|
|
endpoints:
|
|
- addresses:
|
|
- 172.17.0.12
|
|
conditions:
|
|
ready: true
|
|
targetRef:
|
|
kind: Pod
|
|
name: name1-1
|
|
namespace: ns
|
|
- addresses:
|
|
- 192.0.2.0
|
|
conditions:
|
|
ready: true
|
|
targetRef:
|
|
kind: ExternalWorkload
|
|
name: wlkd1
|
|
namespace: ns
|
|
topology:
|
|
kubernetes.io/hostname: node-1
|
|
kind: EndpointSlice
|
|
metadata:
|
|
labels:
|
|
kubernetes.io/service-name: name1
|
|
name: name1-es
|
|
namespace: ns
|
|
ports:
|
|
- name: ""
|
|
port: 8989`, `
|
|
apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: name1-1
|
|
namespace: ns
|
|
status:
|
|
phase: Running
|
|
podIP: 172.17.0.12`, `
|
|
apiVersion: workload.linkerd.io/v1beta1
|
|
kind: ExternalWorkload
|
|
metadata:
|
|
name: wlkd1
|
|
namespace: ns
|
|
spec:
|
|
meshTLS:
|
|
identity: foo
|
|
serverName: foo
|
|
ports:
|
|
- port: 8989
|
|
workloadIPs:
|
|
- ip: 192.0.2.0
|
|
status:
|
|
conditions:
|
|
- type: Ready
|
|
status: "True"
|
|
`,
|
|
}
|
|
|
|
k8sAPI, err := k8s.NewFakeAPI(k8sConfigsWithES...)
|
|
if err != nil {
|
|
t.Fatalf("NewFakeAPI returned an error: %s", err)
|
|
}
|
|
|
|
metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
|
|
if err != nil {
|
|
t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
|
|
}
|
|
|
|
watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local")
|
|
if err != nil {
|
|
t.Fatalf("can't create Endpoints watcher: %s", err)
|
|
}
|
|
|
|
k8sAPI.Sync(nil)
|
|
metadataAPI.Sync(nil)
|
|
|
|
listener := newBufferingEndpointListener()
|
|
|
|
err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, "", listener)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
k8sAPI.Sync(nil)
|
|
metadataAPI.Sync(nil)
|
|
|
|
listener.ExpectAdded([]string{"172.17.0.12:8989", "192.0.2.0:8989"}, t)
|
|
|
|
// Change readiness status for pod and for external workload
|
|
es, err := k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Get(context.Background(), "name1-es", metav1.GetOptions{})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
unready := false
|
|
es.Endpoints[0].Conditions.Ready = &unready
|
|
es.Endpoints[1].Conditions.Ready = &unready
|
|
|
|
_, err = k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Update(context.Background(), es, metav1.UpdateOptions{})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
k8sAPI.Sync(nil)
|
|
metadataAPI.Sync(nil)
|
|
|
|
// Wait for the update to be processed because there is no blocking call currently in k8s that we can wait on
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
|
listener.ExpectRemoved([]string{"172.17.0.12:8989", "192.0.2.0:8989"}, t)
|
|
}
|
|
|
|
// Test that when an endpointslice's endpoints change their readiness status to
|
|
// ready, this is correctly picked up by the subscribers
|
|
func TestEndpointSliceChangeToReady(t *testing.T) {
|
|
k8sConfigsWithES := []string{`
|
|
kind: APIResourceList
|
|
apiVersion: v1
|
|
groupVersion: discovery.k8s.io/v1
|
|
resources:
|
|
- name: endpointslices
|
|
singularName: endpointslice
|
|
namespaced: true
|
|
kind: EndpointSlice
|
|
verbs:
|
|
- delete
|
|
- deletecollection
|
|
- get
|
|
- list
|
|
- patch
|
|
- create
|
|
- update
|
|
- watch
|
|
`, `
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: name1
|
|
namespace: ns
|
|
spec:
|
|
type: LoadBalancer
|
|
ports:
|
|
- port: 8989`, `
|
|
addressType: IPv4
|
|
apiVersion: discovery.k8s.io/v1
|
|
endpoints:
|
|
- addresses:
|
|
- 172.17.0.12
|
|
conditions:
|
|
ready: true
|
|
targetRef:
|
|
kind: Pod
|
|
name: name1-1
|
|
namespace: ns
|
|
- addresses:
|
|
- 172.17.0.13
|
|
conditions:
|
|
ready: false
|
|
targetRef:
|
|
kind: Pod
|
|
name: name1-2
|
|
namespace: ns
|
|
- addresses:
|
|
- 192.0.2.0
|
|
conditions:
|
|
ready: true
|
|
targetRef:
|
|
kind: ExternalWorkload
|
|
name: wlkd1
|
|
namespace: ns
|
|
topology:
|
|
kubernetes.io/hostname: node-1
|
|
- addresses:
|
|
- 192.0.2.1
|
|
conditions:
|
|
ready: false
|
|
targetRef:
|
|
kind: ExternalWorkload
|
|
name: wlkd2
|
|
namespace: ns
|
|
topology:
|
|
kubernetes.io/hostname: node-1
|
|
kind: EndpointSlice
|
|
metadata:
|
|
labels:
|
|
kubernetes.io/service-name: name1
|
|
name: name1-es
|
|
namespace: ns
|
|
ports:
|
|
- name: ""
|
|
port: 8989`, `
|
|
apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: name1-1
|
|
namespace: ns
|
|
status:
|
|
phase: Running
|
|
podIP: 172.17.0.12`, `
|
|
apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: name1-2
|
|
namespace: ns
|
|
status:
|
|
phase: Running
|
|
podIP: 172.17.0.13`, `
|
|
apiVersion: workload.linkerd.io/v1beta1
|
|
kind: ExternalWorkload
|
|
metadata:
|
|
name: wlkd1
|
|
namespace: ns
|
|
spec:
|
|
meshTLS:
|
|
identity: foo
|
|
serverName: foo
|
|
ports:
|
|
- port: 8989
|
|
workloadIPs:
|
|
- ip: 192.0.2.0
|
|
status:
|
|
conditions:
|
|
- type: Ready
|
|
status: "True"
|
|
`, `
|
|
apiVersion: workload.linkerd.io/v1beta1
|
|
kind: ExternalWorkload
|
|
metadata:
|
|
name: wlkd2
|
|
namespace: ns
|
|
spec:
|
|
meshTLS:
|
|
identity: foo
|
|
serverName: foo
|
|
ports:
|
|
- port: 8989
|
|
workloadIPs:
|
|
- ip: 192.0.2.1
|
|
status:
|
|
conditions:
|
|
- type: Ready
|
|
status: "True"
|
|
`,
|
|
}
|
|
|
|
k8sAPI, err := k8s.NewFakeAPI(k8sConfigsWithES...)
|
|
if err != nil {
|
|
t.Fatalf("NewFakeAPI returned an error: %s", err)
|
|
}
|
|
|
|
metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
|
|
if err != nil {
|
|
t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
|
|
}
|
|
|
|
watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local")
|
|
if err != nil {
|
|
t.Fatalf("can't create Endpoints watcher: %s", err)
|
|
}
|
|
|
|
k8sAPI.Sync(nil)
|
|
metadataAPI.Sync(nil)
|
|
|
|
listener := newBufferingEndpointListener()
|
|
|
|
err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, "", listener)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
k8sAPI.Sync(nil)
|
|
metadataAPI.Sync(nil)
|
|
|
|
// Expect only two endpoints to be added, the rest are not ready
|
|
listener.ExpectAdded([]string{"172.17.0.12:8989", "192.0.2.0:8989"}, t)
|
|
|
|
es, err := k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Get(context.Background(), "name1-es", metav1.GetOptions{})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Change readiness status for pod and for external workload only if they
|
|
// are unready
|
|
rdy := true
|
|
es.Endpoints[1].Conditions.Ready = &rdy
|
|
es.Endpoints[3].Conditions.Ready = &rdy
|
|
|
|
_, err = k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Update(context.Background(), es, metav1.UpdateOptions{})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
k8sAPI.Sync(nil)
|
|
metadataAPI.Sync(nil)
|
|
|
|
// Wait for the update to be processed because there is no blocking call currently in k8s that we can wait on
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
|
listener.ExpectAdded([]string{"172.17.0.12:8989", "172.17.0.13:8989", "192.0.2.0:8989", "192.0.2.1:8989"}, t)
|
|
|
|
}
|
|
|
|
// Test that when an endpointslice gets a hint added, then mark it as a change
|
|
func TestEndpointSliceAddHints(t *testing.T) {
|
|
k8sConfigsWithES := []string{`
|
|
kind: APIResourceList
|
|
apiVersion: v1
|
|
groupVersion: discovery.k8s.io/v1
|
|
resources:
|
|
- name: endpointslices
|
|
singularName: endpointslice
|
|
namespaced: true
|
|
kind: EndpointSlice
|
|
verbs:
|
|
- delete
|
|
- deletecollection
|
|
- get
|
|
- list
|
|
- patch
|
|
- create
|
|
- update
|
|
- watch
|
|
`, `
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: name1
|
|
namespace: ns
|
|
spec:
|
|
type: LoadBalancer
|
|
ports:
|
|
- port: 8989`, `
|
|
addressType: IPv4
|
|
apiVersion: discovery.k8s.io/v1
|
|
endpoints:
|
|
- addresses:
|
|
- 172.17.0.12
|
|
conditions:
|
|
ready: true
|
|
targetRef:
|
|
kind: Pod
|
|
name: name1-1
|
|
namespace: ns
|
|
topology:
|
|
kubernetes.io/hostname: node-1
|
|
kind: EndpointSlice
|
|
metadata:
|
|
labels:
|
|
kubernetes.io/service-name: name1
|
|
name: name1-es
|
|
namespace: ns
|
|
ports:
|
|
- name: ""
|
|
port: 8989`, `
|
|
apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: name1-1
|
|
namespace: ns
|
|
status:
|
|
phase: Running
|
|
podIP: 172.17.0.12`}
|
|
|
|
// Create an EndpointSlice with one endpoint, backed by a pod.
|
|
|
|
k8sAPI, err := k8s.NewFakeAPI(k8sConfigsWithES...)
|
|
if err != nil {
|
|
t.Fatalf("NewFakeAPI returned an error: %s", err)
|
|
}
|
|
|
|
metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
|
|
if err != nil {
|
|
t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
|
|
}
|
|
|
|
watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local")
|
|
if err != nil {
|
|
t.Fatalf("can't create Endpoints watcher: %s", err)
|
|
}
|
|
|
|
k8sAPI.Sync(nil)
|
|
metadataAPI.Sync(nil)
|
|
|
|
listener := newBufferingEndpointListener()
|
|
|
|
err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, "", listener)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
k8sAPI.Sync(nil)
|
|
metadataAPI.Sync(nil)
|
|
|
|
listener.ExpectAdded([]string{"172.17.0.12:8989"}, t)
|
|
|
|
// Add a hint to the EndpointSlice
|
|
es, err := k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Get(context.Background(), "name1-es", metav1.GetOptions{})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
es.Endpoints[0].Hints = &dv1.EndpointHints{
|
|
ForZones: []dv1.ForZone{{Name: "zone1"}},
|
|
}
|
|
|
|
_, err = k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Update(context.Background(), es, metav1.UpdateOptions{})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
k8sAPI.Sync(nil)
|
|
metadataAPI.Sync(nil)
|
|
|
|
// Wait for the update to be processed because there is no blocking call currently in k8s that we can wait on
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
|
listener.ExpectAdded([]string{"172.17.0.12:8989", "172.17.0.12:8989"}, t)
|
|
}
|
|
|
|
// Test that when an endpointslice loses a hint, then mark it as a change
|
|
func TestEndpointSliceRemoveHints(t *testing.T) {
|
|
k8sConfigsWithES := []string{`
|
|
kind: APIResourceList
|
|
apiVersion: v1
|
|
groupVersion: discovery.k8s.io/v1
|
|
resources:
|
|
- name: endpointslices
|
|
singularName: endpointslice
|
|
namespaced: true
|
|
kind: EndpointSlice
|
|
verbs:
|
|
- delete
|
|
- deletecollection
|
|
- get
|
|
- list
|
|
- patch
|
|
- create
|
|
- update
|
|
- watch
|
|
`, `
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: name1
|
|
namespace: ns
|
|
spec:
|
|
type: LoadBalancer
|
|
ports:
|
|
- port: 8989`, `
|
|
addressType: IPv4
|
|
apiVersion: discovery.k8s.io/v1
|
|
endpoints:
|
|
- addresses:
|
|
- 172.17.0.12
|
|
conditions:
|
|
hints:
|
|
forZones:
|
|
- name: zone1
|
|
ready: true
|
|
targetRef:
|
|
kind: Pod
|
|
name: name1-1
|
|
namespace: ns
|
|
topology:
|
|
kubernetes.io/hostname: node-1
|
|
kind: EndpointSlice
|
|
metadata:
|
|
labels:
|
|
kubernetes.io/service-name: name1
|
|
name: name1-es
|
|
namespace: ns
|
|
ports:
|
|
- name: ""
|
|
port: 8989`, `
|
|
apiVersion: v1
|
|
kind: Pod
|
|
metadata:
|
|
name: name1-1
|
|
namespace: ns
|
|
status:
|
|
phase: Running
|
|
podIP: 172.17.0.12`}
|
|
|
|
// Create an EndpointSlice with one endpoint, backed by a pod.
|
|
|
|
k8sAPI, err := k8s.NewFakeAPI(k8sConfigsWithES...)
|
|
if err != nil {
|
|
t.Fatalf("NewFakeAPI returned an error: %s", err)
|
|
}
|
|
|
|
metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
|
|
if err != nil {
|
|
t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
|
|
}
|
|
|
|
watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local")
|
|
if err != nil {
|
|
t.Fatalf("can't create Endpoints watcher: %s", err)
|
|
}
|
|
|
|
k8sAPI.Sync(nil)
|
|
metadataAPI.Sync(nil)
|
|
|
|
listener := newBufferingEndpointListener()
|
|
|
|
err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, "", listener)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
k8sAPI.Sync(nil)
|
|
metadataAPI.Sync(nil)
|
|
|
|
listener.ExpectAdded([]string{"172.17.0.12:8989"}, t)
|
|
|
|
// Remove a hint from the EndpointSlice
|
|
es, err := k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Get(context.Background(), "name1-es", metav1.GetOptions{})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
es.Endpoints[0].Hints = &dv1.EndpointHints{
|
|
//ForZones: []dv1.ForZone{{Name: "zone1"}},
|
|
}
|
|
|
|
_, err = k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Update(context.Background(), es, metav1.UpdateOptions{})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
k8sAPI.Sync(nil)
|
|
metadataAPI.Sync(nil)
|
|
|
|
// Wait for the update to be processed because there is no blocking call currently in k8s that we can wait on
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
|
listener.ExpectAdded([]string{"172.17.0.12:8989", "172.17.0.12:8989"}, t)
|
|
}
|