Add utilities for getting EndpointSlices for a Service
Co-Authored-by: Jordan Liggitt <liggitt@google.com> Kubernetes-commit: 41dc2d3b0240c925110152c28fb41339d57d14e7
This commit is contained in:
parent
a2aad46195
commit
c7df1daeb0
|
@ -0,0 +1,114 @@
|
|||
/*
|
||||
Copyright 2025 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
discoveryv1 "k8s.io/api/discovery/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
discoveryv1informer "k8s.io/client-go/informers/discovery/v1"
|
||||
discoveryv1lister "k8s.io/client-go/listers/discovery/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
// EndpointSliceGetter is an interface for a helper that lets you easily get all
|
||||
// EndpointSlices for a Service.
|
||||
type EndpointSliceGetter interface {
|
||||
// GetEndpointSlices returns all of the known slices associated with the given
|
||||
// service. If there are no slices associated with the service, it will return an
|
||||
// empty list, not an error.
|
||||
GetEndpointSlices(namespaceName, serviceName string) ([]*discoveryv1.EndpointSlice, error)
|
||||
}
|
||||
|
||||
const indexKey = "namespaceName_serviceName"
|
||||
|
||||
// ensureServiceNameIndexer ensures that indexer has a namespace/serviceName indexer
|
||||
func ensureServiceNameIndexer(indexer cache.Indexer) error {
|
||||
if _, exists := indexer.GetIndexers()[indexKey]; exists {
|
||||
return nil
|
||||
}
|
||||
err := indexer.AddIndexers(map[string]cache.IndexFunc{indexKey: func(obj any) ([]string, error) {
|
||||
ep, ok := obj.(*discoveryv1.EndpointSlice)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("expected *discoveryv1.EndpointSlice, got %T", obj)
|
||||
}
|
||||
serviceName, labelExists := ep.Labels[discoveryv1.LabelServiceName]
|
||||
if !labelExists {
|
||||
// Not associated with a service; don't add to this index.
|
||||
return nil, nil
|
||||
}
|
||||
return []string{ep.Namespace + "/" + serviceName}, nil
|
||||
}})
|
||||
if err != nil {
|
||||
// Check if the indexer exists now; if so, that means we were racing with
|
||||
// another thread, and they successfully installed the indexer, so we can
|
||||
// ignore the error.
|
||||
if _, exists := indexer.GetIndexers()[indexKey]; exists {
|
||||
err = nil
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// NewEndpointSliceIndexerGetter returns an EndpointSliceGetter that wraps an informer and
|
||||
// updates its indexes so that you can efficiently find the EndpointSlices associated with
|
||||
// a Service later. (Note that sliceInformer will continue the additional indexing for as
|
||||
// long as it runs, even if if the EndpointSliceGetter is destroyed. Use
|
||||
// NewEndpointSliceListerGetter if you want want to fetch EndpointSlices without changing
|
||||
// the underlying cache.)
|
||||
func NewEndpointSliceIndexerGetter(sliceInformer discoveryv1informer.EndpointSliceInformer) (EndpointSliceGetter, error) {
|
||||
indexer := sliceInformer.Informer().GetIndexer()
|
||||
if err := ensureServiceNameIndexer(indexer); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &endpointSliceIndexerGetter{indexer: indexer}, nil
|
||||
}
|
||||
|
||||
type endpointSliceIndexerGetter struct {
|
||||
indexer cache.Indexer
|
||||
}
|
||||
|
||||
func (e *endpointSliceIndexerGetter) GetEndpointSlices(namespaceName, serviceName string) ([]*discoveryv1.EndpointSlice, error) {
|
||||
objs, err := e.indexer.ByIndex(indexKey, namespaceName+"/"+serviceName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
eps := make([]*discoveryv1.EndpointSlice, 0, len(objs))
|
||||
for _, obj := range objs {
|
||||
ep, ok := obj.(*discoveryv1.EndpointSlice)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("expected *discoveryv1.EndpointSlice, got %T", obj)
|
||||
}
|
||||
eps = append(eps, ep)
|
||||
}
|
||||
return eps, nil
|
||||
}
|
||||
|
||||
// NewEndpointSliceListerGetter returns an EndpointSliceGetter that uses a lister to do a
|
||||
// full selection on every lookup.
|
||||
func NewEndpointSliceListerGetter(sliceLister discoveryv1lister.EndpointSliceLister) (EndpointSliceGetter, error) {
|
||||
return &endpointSliceListerGetter{lister: sliceLister}, nil
|
||||
}
|
||||
|
||||
type endpointSliceListerGetter struct {
|
||||
lister discoveryv1lister.EndpointSliceLister
|
||||
}
|
||||
|
||||
func (e *endpointSliceListerGetter) GetEndpointSlices(namespaceName, serviceName string) ([]*discoveryv1.EndpointSlice, error) {
|
||||
return e.lister.EndpointSlices(namespaceName).List(labels.SelectorFromSet(labels.Set{discoveryv1.LabelServiceName: serviceName}))
|
||||
}
|
|
@ -0,0 +1,200 @@
|
|||
/*
|
||||
Copyright 2025 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
|
||||
discoveryv1 "k8s.io/api/discovery/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/informers"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
clientsetfake "k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
func makeEndpointSlice(namespace, service, slice int, ip string) *discoveryv1.EndpointSlice {
|
||||
namespaceName := fmt.Sprintf("namespace%d", namespace)
|
||||
serviceName := fmt.Sprintf("service%d", service)
|
||||
sliceName := fmt.Sprintf("service%d-%d%d%d", service, slice, slice, slice)
|
||||
return &discoveryv1.EndpointSlice{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: sliceName,
|
||||
Namespace: namespaceName,
|
||||
Labels: map[string]string{
|
||||
discoveryv1.LabelServiceName: serviceName,
|
||||
},
|
||||
},
|
||||
Endpoints: []discoveryv1.Endpoint{
|
||||
{
|
||||
Addresses: []string{ip},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Note that we resuse the same service names in the two namespaces to test proper
|
||||
// indexing/namespacing.
|
||||
var (
|
||||
// namespace1 service1:
|
||||
// - initial: 1 slice
|
||||
// - phase1: add 1 slice
|
||||
// - phase2: no change
|
||||
n1s1FirstSlice = makeEndpointSlice(1, 1, 1, "10.1.1.1")
|
||||
n1s1SecondSlice = makeEndpointSlice(1, 1, 2, "10.1.1.2")
|
||||
n1s1InitialSlices = []*discoveryv1.EndpointSlice{n1s1FirstSlice}
|
||||
n1s1Phase1Slices = []*discoveryv1.EndpointSlice{n1s1FirstSlice, n1s1SecondSlice}
|
||||
n1s1Phase2Slices = []*discoveryv1.EndpointSlice{n1s1FirstSlice, n1s1SecondSlice}
|
||||
|
||||
// namespace1 service2:
|
||||
// - initial: 1 slice
|
||||
// - phase1: update slice
|
||||
// - phase2: delete slice
|
||||
n1s2FirstSlice = makeEndpointSlice(1, 2, 1, "10.1.2.1")
|
||||
n1s2UpdatedSlice = makeEndpointSlice(1, 2, 1, "10.1.2.99")
|
||||
n1s2InitialSlices = []*discoveryv1.EndpointSlice{n1s2FirstSlice}
|
||||
n1s2Phase1Slices = []*discoveryv1.EndpointSlice{n1s2UpdatedSlice}
|
||||
n1s2Phase2Slices = []*discoveryv1.EndpointSlice{}
|
||||
|
||||
// namespace2 service 1:
|
||||
// - initial: 2 slices
|
||||
// - phase1: delete first slice
|
||||
// - phase2: delete second slice
|
||||
n2s1FirstSlice = makeEndpointSlice(2, 1, 1, "10.2.1.1")
|
||||
n2s1SecondSlice = makeEndpointSlice(2, 1, 2, "10.2.1.2")
|
||||
n2s1InitialSlices = []*discoveryv1.EndpointSlice{n2s1FirstSlice, n2s1SecondSlice}
|
||||
n2s1Phase1Slices = []*discoveryv1.EndpointSlice{n2s1SecondSlice}
|
||||
n2s1Phase2Slices = []*discoveryv1.EndpointSlice{}
|
||||
|
||||
// namespace2 service 2:
|
||||
// - initial: no slices
|
||||
// - phase1: no change
|
||||
// - phase2: create slice
|
||||
n2s2FirstSlice = makeEndpointSlice(2, 2, 1, "10.2.2.1")
|
||||
n2s2InitialSlices = []*discoveryv1.EndpointSlice{}
|
||||
n2s2Phase1Slices = []*discoveryv1.EndpointSlice{}
|
||||
n2s2Phase2Slices = []*discoveryv1.EndpointSlice{n2s2FirstSlice}
|
||||
|
||||
initialSlices = []runtime.Object{
|
||||
n1s1FirstSlice, n1s2FirstSlice, n2s1FirstSlice, n2s1SecondSlice,
|
||||
}
|
||||
)
|
||||
|
||||
func assertSlices(t *testing.T, getter EndpointSliceGetter, namespace, service string, expected []*discoveryv1.EndpointSlice) {
|
||||
t.Helper()
|
||||
|
||||
// Poll because the informers may not sync immediately
|
||||
var lastErr error
|
||||
err := wait.PollUntilContextTimeout(context.Background(), 10*time.Millisecond, time.Second, true, func(ctx context.Context) (bool, error) {
|
||||
slices, err := getter.GetEndpointSlices(namespace, service)
|
||||
if err != nil {
|
||||
lastErr = fmt.Errorf("unexpected error getting %s/%s slices: %w", namespace, service, err)
|
||||
return false, nil
|
||||
}
|
||||
// cmp.Diff doesn't deal with nil vs []
|
||||
if len(expected) == 0 && len(slices) == 0 {
|
||||
return true, nil
|
||||
}
|
||||
if diff := cmp.Diff(expected, slices); diff != "" {
|
||||
lastErr = fmt.Errorf("slices for %s/%s did not match expectation:\n%s", namespace, service, diff)
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf("%s", lastErr)
|
||||
}
|
||||
}
|
||||
|
||||
func testGetter(t *testing.T, client clientset.Interface, getter EndpointSliceGetter) {
|
||||
ctx := context.Background()
|
||||
|
||||
// Check initial state
|
||||
assertSlices(t, getter, "namespace1", "service1", n1s1InitialSlices)
|
||||
assertSlices(t, getter, "namespace1", "service2", n1s2InitialSlices)
|
||||
assertSlices(t, getter, "namespace2", "service1", n2s1InitialSlices)
|
||||
assertSlices(t, getter, "namespace2", "service2", n2s2InitialSlices)
|
||||
|
||||
_, err := client.DiscoveryV1().EndpointSlices("namespace1").Create(ctx, n1s1SecondSlice, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
_, err = client.DiscoveryV1().EndpointSlices("namespace1").Update(ctx, n1s2UpdatedSlice, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
err = client.DiscoveryV1().EndpointSlices("namespace2").Delete(ctx, n2s1FirstSlice.Name, metav1.DeleteOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
assertSlices(t, getter, "namespace1", "service1", n1s1Phase1Slices)
|
||||
assertSlices(t, getter, "namespace1", "service2", n1s2Phase1Slices)
|
||||
assertSlices(t, getter, "namespace2", "service1", n2s1Phase1Slices)
|
||||
assertSlices(t, getter, "namespace2", "service2", n2s2Phase1Slices)
|
||||
|
||||
err = client.DiscoveryV1().EndpointSlices("namespace1").Delete(ctx, n1s2FirstSlice.Name, metav1.DeleteOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
err = client.DiscoveryV1().EndpointSlices("namespace2").Delete(ctx, n2s1SecondSlice.Name, metav1.DeleteOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
_, err = client.DiscoveryV1().EndpointSlices("namespace2").Create(ctx, n2s2FirstSlice, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
assertSlices(t, getter, "namespace1", "service1", n1s1Phase2Slices)
|
||||
assertSlices(t, getter, "namespace1", "service2", n1s2Phase2Slices)
|
||||
assertSlices(t, getter, "namespace2", "service1", n2s1Phase2Slices)
|
||||
assertSlices(t, getter, "namespace2", "service2", n2s2Phase2Slices)
|
||||
}
|
||||
|
||||
func TestNewEndpointSliceIndexerGetter(t *testing.T) {
|
||||
client := clientsetfake.NewSimpleClientset(initialSlices...)
|
||||
informerFactory := informers.NewSharedInformerFactory(client, 30*time.Second)
|
||||
getter, err := NewEndpointSliceIndexerGetter(informerFactory.Discovery().V1().EndpointSlices())
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
informerFactory.Start(wait.NeverStop)
|
||||
cache.WaitForCacheSync(nil, informerFactory.Discovery().V1().EndpointSlices().Informer().HasSynced)
|
||||
|
||||
testGetter(t, client, getter)
|
||||
}
|
||||
|
||||
func TestNewEndpointSliceListerGetter(t *testing.T) {
|
||||
client := clientsetfake.NewSimpleClientset(initialSlices...)
|
||||
informerFactory := informers.NewSharedInformerFactory(client, 30*time.Second)
|
||||
getter, err := NewEndpointSliceListerGetter(informerFactory.Discovery().V1().EndpointSlices().Lister())
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
informerFactory.Start(wait.NeverStop)
|
||||
cache.WaitForCacheSync(nil, informerFactory.Discovery().V1().EndpointSlices().Informer().HasSynced)
|
||||
|
||||
testGetter(t, client, getter)
|
||||
}
|
Loading…
Reference in New Issue