Merge pull request #128402 from richabanker/mvp-agg-discovery

KEP 4020: Replace StorageVersionAPI with aggregated discovery to fetch served resources by a peer apiserver

Kubernetes-commit: a6227695ab10a79219c253c94e65c0ee1c4cf18d
This commit is contained in:
Kubernetes Publisher 2025-03-18 21:43:49 -07:00
commit 1a83f0ce07
6 changed files with 974 additions and 393 deletions

View File

@ -50,6 +50,11 @@ func Register() {
}) })
} }
// Only used for tests.
func Reset() {
legacyregistry.Reset()
}
// IncPeerProxiedRequest increments the # of proxied requests to peer kube-apiserver // IncPeerProxiedRequest increments the # of proxied requests to peer kube-apiserver
func IncPeerProxiedRequest(ctx context.Context, status string) { func IncPeerProxiedRequest(ctx context.Context, status string) {
peerProxiedRequestsTotal.WithContext(ctx).WithLabelValues(status).Add(1) peerProxiedRequestsTotal.WithContext(ctx).WithLabelValues(status).Add(1)

View File

@ -0,0 +1,198 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package peerproxy
import (
"context"
"fmt"
"net/http"
"time"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/client-go/discovery"
"k8s.io/klog/v2"
apidiscoveryv2 "k8s.io/api/apidiscovery/v2"
v1 "k8s.io/api/coordination/v1"
schema "k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
responsewriterutil "k8s.io/apiserver/pkg/util/responsewriter"
)
const (
controllerName = "peer-discovery-cache-sync"
maxRetries = 5
)
func (h *peerProxyHandler) RunPeerDiscoveryCacheSync(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer h.peerLeaseQueue.ShutDown()
defer func() {
err := h.apiserverIdentityInformer.Informer().RemoveEventHandler(h.leaseRegistration)
if err != nil {
klog.Warning("error removing leaseInformer eventhandler")
}
}()
klog.Infof("Workers: %d", workers)
for i := 0; i < workers; i++ {
klog.Infof("Starting worker")
go wait.UntilWithContext(ctx, h.runWorker, time.Second)
}
<-ctx.Done()
}
func (h *peerProxyHandler) enqueueLease(lease *v1.Lease) {
h.peerLeaseQueue.Add(lease.Name)
}
func (h *peerProxyHandler) runWorker(ctx context.Context) {
for h.processNextElectionItem(ctx) {
}
}
func (h *peerProxyHandler) processNextElectionItem(ctx context.Context) bool {
key, shutdown := h.peerLeaseQueue.Get()
if shutdown {
return false
}
defer h.peerLeaseQueue.Done(key)
err := h.syncPeerDiscoveryCache(ctx)
h.handleErr(err, key)
return true
}
func (h *peerProxyHandler) syncPeerDiscoveryCache(ctx context.Context) error {
var fetchDiscoveryErr error
// Rebuild the peer discovery cache from available leases.
leases, err := h.apiserverIdentityInformer.Lister().List(h.identityLeaseLabelSelector)
if err != nil {
utilruntime.HandleError(err)
return err
}
newCache := map[string]map[schema.GroupVersionResource]bool{}
for _, l := range leases {
_, ok := h.isValidPeerIdentityLease(l)
if !ok {
continue
}
discoveryInfo, err := h.fetchNewDiscoveryFor(ctx, l.Name, *l.Spec.HolderIdentity)
if err != nil {
fetchDiscoveryErr = err
}
if discoveryInfo != nil {
newCache[l.Name] = discoveryInfo
}
}
// Overwrite cache with new contents.
h.peerDiscoveryInfoCache.Store(newCache)
return fetchDiscoveryErr
}
func (h *peerProxyHandler) fetchNewDiscoveryFor(ctx context.Context, serverID string, holderIdentity string) (map[schema.GroupVersionResource]bool, error) {
hostport, err := h.hostportInfo(serverID)
if err != nil {
return nil, fmt.Errorf("failed to get host port info from identity lease for server %s: %w", serverID, err)
}
klog.V(4).Infof("Proxying an agg-discovery call from %s to %s", h.serverID, serverID)
servedResources := make(map[schema.GroupVersionResource]bool)
var discoveryErr error
var discoveryResponse *apidiscoveryv2.APIGroupDiscoveryList
discoveryPaths := []string{"/api", "/apis"}
for _, path := range discoveryPaths {
discoveryResponse, discoveryErr = h.aggregateDiscovery(ctx, path, hostport)
if err != nil {
klog.ErrorS(err, "error querying discovery endpoint for serverID", "path", path, "serverID", serverID)
continue
}
for _, groupDiscovery := range discoveryResponse.Items {
groupName := groupDiscovery.Name
if groupName == "" {
groupName = "core"
}
for _, version := range groupDiscovery.Versions {
for _, resource := range version.Resources {
gvr := schema.GroupVersionResource{Group: groupName, Version: version.Version, Resource: resource.Resource}
servedResources[gvr] = true
}
}
}
}
klog.V(4).Infof("Agg discovery done successfully by %s for %s", h.serverID, serverID)
return servedResources, discoveryErr
}
func (h *peerProxyHandler) aggregateDiscovery(ctx context.Context, path string, hostport string) (*apidiscoveryv2.APIGroupDiscoveryList, error) {
req, err := http.NewRequest(http.MethodGet, path, nil)
if err != nil {
return nil, err
}
apiServerUser := &user.DefaultInfo{
Name: user.APIServerUser,
UID: user.APIServerUser,
Groups: []string{user.AllAuthenticated},
}
ctx = apirequest.WithUser(ctx, apiServerUser)
req = req.WithContext(ctx)
req.Header.Add("Accept", discovery.AcceptV2)
writer := responsewriterutil.NewInMemoryResponseWriter()
h.proxyRequestToDestinationAPIServer(req, writer, hostport)
if writer.RespCode() != http.StatusOK {
return nil, fmt.Errorf("discovery request failed with status: %d", writer.RespCode())
}
parsed := &apidiscoveryv2.APIGroupDiscoveryList{}
if err := runtime.DecodeInto(h.discoverySerializer.UniversalDecoder(), writer.Data(), parsed); err != nil {
return nil, fmt.Errorf("error decoding discovery response: %w", err)
}
return parsed, nil
}
// handleErr checks if an error happened and makes sure we will retry later.
func (h *peerProxyHandler) handleErr(err error, key string) {
if err == nil {
h.peerLeaseQueue.Forget(key)
return
}
if h.peerLeaseQueue.NumRequeues(key) < maxRetries {
klog.Infof("Error syncing discovery for peer lease %v: %v", key, err)
h.peerLeaseQueue.AddRateLimited(key)
return
}
h.peerLeaseQueue.Forget(key)
utilruntime.HandleError(err)
klog.Infof("Dropping lease %s out of the queue: %v", key, err)
}

View File

@ -0,0 +1,344 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package peerproxy
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/transport"
apidiscoveryv2 "k8s.io/api/apidiscovery/v2"
v1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func TestRunPeerDiscoveryCacheSync(t *testing.T) {
localServerID := "local-server"
testCases := []struct {
desc string
leases []*v1.Lease
labelSelectorString string
updatedLease *v1.Lease
deletedLeaseNames []string
wantCache map[string]map[schema.GroupVersionResource]bool
}{
{
desc: "single remote server",
labelSelectorString: "apiserver-identity=testserver",
leases: []*v1.Lease{
{
ObjectMeta: metav1.ObjectMeta{
Name: "remote-1",
Labels: map[string]string{"apiserver-identity": "testserver"},
},
Spec: v1.LeaseSpec{HolderIdentity: proto.String("holder-1")},
},
},
wantCache: map[string]map[schema.GroupVersionResource]bool{
"remote-1": {
{Group: "testgroup", Version: "v1", Resource: "testresources"}: true,
},
},
},
{
desc: "multiple remote servers",
labelSelectorString: "apiserver-identity=testserver",
leases: []*v1.Lease{
{
ObjectMeta: metav1.ObjectMeta{
Name: "remote-1",
Labels: map[string]string{"apiserver-identity": "testserver"},
},
Spec: v1.LeaseSpec{HolderIdentity: proto.String("holder-1")},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "remote-2",
Labels: map[string]string{"apiserver-identity": "testserver"},
},
Spec: v1.LeaseSpec{HolderIdentity: proto.String("holder-2")},
},
},
wantCache: map[string]map[schema.GroupVersionResource]bool{
"remote-1": {
{Group: "testgroup", Version: "v1", Resource: "testresources"}: true,
},
"remote-2": {
{Group: "testgroup", Version: "v1", Resource: "testresources"}: true,
},
},
},
{
desc: "lease update",
labelSelectorString: "apiserver-identity=testserver",
leases: []*v1.Lease{
{
ObjectMeta: metav1.ObjectMeta{
Name: "remote-1",
Labels: map[string]string{"apiserver-identity": "testserver"},
},
Spec: v1.LeaseSpec{HolderIdentity: proto.String("holder-1")},
},
},
updatedLease: &v1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: "remote-1",
Labels: map[string]string{"apiserver-identity": "testserver"},
},
Spec: v1.LeaseSpec{HolderIdentity: proto.String("holder-2")},
},
wantCache: map[string]map[schema.GroupVersionResource]bool{
"remote-1": {
{Group: "testgroup", Version: "v1", Resource: "testresources"}: true,
},
},
},
{
desc: "lease deletion",
labelSelectorString: "apiserver-identity=testserver",
leases: []*v1.Lease{
{
ObjectMeta: metav1.ObjectMeta{
Name: "remote-1",
Labels: map[string]string{"apiserver-identity": "testserver"},
},
Spec: v1.LeaseSpec{HolderIdentity: proto.String("holder-1")},
},
},
deletedLeaseNames: []string{"remote-1"},
wantCache: map[string]map[schema.GroupVersionResource]bool{},
},
}
for _, tt := range testCases {
t.Run(tt.desc, func(t *testing.T) {
fakeClient := fake.NewSimpleClientset()
fakeInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0)
leaseInformer := fakeInformerFactory.Coordination().V1().Leases()
fakeReconciler := newFakeReconciler()
negotiatedSerializer := serializer.NewCodecFactory(runtime.NewScheme())
loopbackConfig := &rest.Config{}
proxyConfig := &transport.Config{
TLS: transport.TLSConfig{Insecure: true},
}
h, err := NewPeerProxyHandler(
localServerID,
tt.labelSelectorString,
leaseInformer,
fakeReconciler,
negotiatedSerializer,
loopbackConfig,
proxyConfig,
)
if err != nil {
t.Fatalf("failed to create handler: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Add leases to the fake client and informer.
for _, lease := range tt.leases {
_, err := fakeClient.CoordinationV1().Leases("default").Create(ctx, lease, metav1.CreateOptions{})
if err != nil {
t.Fatalf("failed to create lease: %v", err)
}
if err = leaseInformer.Informer().GetIndexer().Add(lease); err != nil {
t.Fatalf("failed to create lease: %v", err)
}
}
go fakeInformerFactory.Start(ctx.Done())
cache.WaitForCacheSync(ctx.Done(), leaseInformer.Informer().HasSynced)
// Create test servers based on leases
testServers := make(map[string]*httptest.Server)
for _, lease := range tt.leases {
testServer := newTestTLSServer(t)
defer testServer.Close()
testServers[lease.Name] = testServer
}
// Modify the reconciler to return the test server URLs
for name, server := range testServers {
fakeReconciler.setEndpoint(name, server.URL[8:])
}
go h.RunPeerDiscoveryCacheSync(ctx, 1)
// Wait for initial cache update.
initialCache := map[string]map[schema.GroupVersionResource]bool{}
for _, lease := range tt.leases {
initialCache[lease.Name] = map[schema.GroupVersionResource]bool{
{Group: "testgroup", Version: "v1", Resource: "testresources"}: true,
}
}
err = wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, false, func(ctx context.Context) (bool, error) {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
}
gotCache := h.peerDiscoveryInfoCache.Load()
return assert.ObjectsAreEqual(initialCache, gotCache), nil
})
if err != nil {
t.Errorf("initial cache update failed: %v", err)
}
// Update the lease if indicated.
if tt.updatedLease != nil {
updatedLease := tt.updatedLease.DeepCopy()
_, err = fakeClient.CoordinationV1().Leases("default").Update(ctx, updatedLease, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("failed to update lease: %v", err)
}
if err = leaseInformer.Informer().GetIndexer().Update(updatedLease); err != nil {
t.Fatalf("failed to update lease: %v", err)
}
}
// Delete leases if indicated.
if len(tt.deletedLeaseNames) > 0 {
for _, leaseName := range tt.deletedLeaseNames {
lease, exists, err := leaseInformer.Informer().GetIndexer().GetByKey("default/" + leaseName)
if err != nil {
t.Fatalf("failed to get lease from indexer: %v", err)
}
if !exists {
t.Fatalf("lease %s not found", leaseName)
}
deletedLease := lease.(*v1.Lease)
err = fakeClient.CoordinationV1().Leases("default").Delete(ctx, deletedLease.Name, metav1.DeleteOptions{})
if err != nil {
t.Fatalf("failed to delete lease: %v", err)
}
if err = leaseInformer.Informer().GetIndexer().Delete(deletedLease); err != nil {
t.Fatalf("failed to delete lease: %v", err)
}
}
}
err = wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, false, func(ctx context.Context) (bool, error) {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
}
gotCache := h.peerDiscoveryInfoCache.Load()
return assert.ObjectsAreEqual(tt.wantCache, gotCache), nil
})
if err != nil {
t.Errorf("cache doesnt match expectation: %v", err)
}
})
}
}
// newTestTLSServer creates a new httptest.NewTLSServer that serves discovery endpoints.
func newTestTLSServer(t *testing.T) *httptest.Server {
return httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/apis" || r.URL.Path == "/api" {
discoveryResponse := &apidiscoveryv2.APIGroupDiscoveryList{
Items: []apidiscoveryv2.APIGroupDiscovery{
{
ObjectMeta: metav1.ObjectMeta{
Name: "testgroup",
},
Versions: []apidiscoveryv2.APIVersionDiscovery{
{
Version: "v1",
Resources: []apidiscoveryv2.APIResourceDiscovery{
{Resource: "testresources"},
},
},
},
},
},
}
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(discoveryResponse); err != nil {
t.Fatalf("error recording discovery response")
}
} else {
w.WriteHeader(http.StatusNotFound)
}
}))
}
type fakeReconciler struct {
endpoints map[string]string
}
func newFakeReconciler() *fakeReconciler {
return &fakeReconciler{
endpoints: make(map[string]string),
}
}
func (f *fakeReconciler) UpdateLease(serverID string, publicIP string, ports []corev1.EndpointPort) error {
return nil
}
func (f *fakeReconciler) DeleteLease(serverID string) error {
return nil
}
func (f *fakeReconciler) Destroy() {
}
func (f *fakeReconciler) GetEndpoint(serverID string) (string, error) {
endpoint, ok := f.endpoints[serverID]
if !ok {
return "", fmt.Errorf("endpoint not found for serverID: %s", serverID)
}
return endpoint, nil
}
func (f *fakeReconciler) RemoveLease(serverID string) error {
return nil
}
func (f *fakeReconciler) StopReconciling() {
}
func (f *fakeReconciler) setEndpoint(serverID, endpoint string) {
f.endpoints[serverID] = endpoint
}

View File

@ -17,51 +17,171 @@ limitations under the License.
package peerproxy package peerproxy
import ( import (
"context"
"fmt"
"net/http" "net/http"
"sync" "strings"
"sync/atomic"
"time"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apiserver/pkg/reconcilers" "k8s.io/apiserver/pkg/reconcilers"
"k8s.io/apiserver/pkg/storageversion" "k8s.io/client-go/discovery"
kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/transport"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
apidiscoveryv2 "k8s.io/api/apidiscovery/v2"
v1 "k8s.io/api/coordination/v1"
schema "k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
coordinationv1informers "k8s.io/client-go/informers/coordination/v1"
) )
// Interface defines how the Unknown Version Proxy filter interacts with the underlying system. // Local discovery cache needs to be refreshed periodically to store
// updates made to custom resources or aggregated resource that can
// change dynamically.
const localDiscoveryRefreshInterval = 30 * time.Minute
// Interface defines how the Mixed Version Proxy filter interacts with the underlying system.
type Interface interface { type Interface interface {
WrapHandler(handler http.Handler) http.Handler WrapHandler(handler http.Handler) http.Handler
WaitForCacheSync(stopCh <-chan struct{}) error WaitForCacheSync(stopCh <-chan struct{}) error
HasFinishedSync() bool HasFinishedSync() bool
RunLocalDiscoveryCacheSync(stopCh <-chan struct{}) error
RunPeerDiscoveryCacheSync(ctx context.Context, workers int)
} }
// New creates a new instance to implement unknown version proxy // New creates a new instance to implement unknown version proxy
func NewPeerProxyHandler(informerFactory kubeinformers.SharedInformerFactory, // This method is used for an alpha feature UnknownVersionInteroperabilityProxy
svm storageversion.Manager, // and is subject to future modifications.
proxyTransport http.RoundTripper, func NewPeerProxyHandler(
serverId string, serverId string,
identityLeaseLabelSelector string,
leaseInformer coordinationv1informers.LeaseInformer,
reconciler reconcilers.PeerEndpointLeaseReconciler, reconciler reconcilers.PeerEndpointLeaseReconciler,
serializer runtime.NegotiatedSerializer) *peerProxyHandler { ser runtime.NegotiatedSerializer,
loopbackClientConfig *rest.Config,
proxyClientConfig *transport.Config,
) (*peerProxyHandler, error) {
h := &peerProxyHandler{ h := &peerProxyHandler{
name: "PeerProxyHandler", name: "PeerProxyHandler",
storageversionManager: svm, serverID: serverId,
proxyTransport: proxyTransport,
svMap: sync.Map{},
serverId: serverId,
reconciler: reconciler, reconciler: reconciler,
serializer: serializer, serializer: ser,
localDiscoveryInfoCache: atomic.Value{},
localDiscoveryCacheTicker: time.NewTicker(localDiscoveryRefreshInterval),
localDiscoveryInfoCachePopulated: make(chan struct{}),
peerDiscoveryInfoCache: atomic.Value{},
peerLeaseQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{
Name: controllerName,
}),
apiserverIdentityInformer: leaseInformer,
} }
svi := informerFactory.Internal().V1alpha1().StorageVersions()
h.storageversionInformer = svi.Informer()
svi.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ if parts := strings.Split(identityLeaseLabelSelector, "="); len(parts) != 2 {
return nil, fmt.Errorf("invalid identityLeaseLabelSelector provided, must be of the form key=value, received: %v", identityLeaseLabelSelector)
}
selector, err := labels.Parse(identityLeaseLabelSelector)
if err != nil {
return nil, fmt.Errorf("failed to parse label selector: %w", err)
}
h.identityLeaseLabelSelector = selector
discoveryScheme := runtime.NewScheme()
utilruntime.Must(apidiscoveryv2.AddToScheme(discoveryScheme))
h.discoverySerializer = serializer.NewCodecFactory(discoveryScheme)
discoveryClient, err := discovery.NewDiscoveryClientForConfig(loopbackClientConfig)
if err != nil {
return nil, fmt.Errorf("error creating discovery client: %w", err)
}
h.discoveryClient = discoveryClient
h.localDiscoveryInfoCache.Store(map[schema.GroupVersionResource]bool{})
h.peerDiscoveryInfoCache.Store(map[string]map[schema.GroupVersionResource]bool{})
proxyTransport, err := transport.New(proxyClientConfig)
if err != nil {
return nil, fmt.Errorf("failed to create proxy transport: %w", err)
}
h.proxyTransport = proxyTransport
peerDiscoveryRegistration, err := h.apiserverIdentityInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { AddFunc: func(obj interface{}) {
h.addSV(obj) if lease, ok := h.isValidPeerIdentityLease(obj); ok {
h.enqueueLease(lease)
}
}, },
UpdateFunc: func(oldObj, newObj interface{}) { UpdateFunc: func(oldObj, newObj interface{}) {
h.updateSV(oldObj, newObj) oldLease, oldLeaseOk := h.isValidPeerIdentityLease(oldObj)
newLease, newLeaseOk := h.isValidPeerIdentityLease(newObj)
if oldLeaseOk && newLeaseOk &&
oldLease.Name == newLease.Name && *oldLease.Spec.HolderIdentity != *newLease.Spec.HolderIdentity {
h.enqueueLease(newLease)
}
}, },
DeleteFunc: func(obj interface{}) { DeleteFunc: func(obj interface{}) {
h.deleteSV(obj) if lease, ok := h.isValidPeerIdentityLease(obj); ok {
}}) h.enqueueLease(lease)
return h }
},
})
if err != nil {
return nil, err
}
h.leaseRegistration = peerDiscoveryRegistration
return h, nil
}
func (h *peerProxyHandler) isValidPeerIdentityLease(obj interface{}) (*v1.Lease, bool) {
lease, ok := obj.(*v1.Lease)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj))
return nil, false
}
if lease, ok = tombstone.Obj.(*v1.Lease); !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj))
return nil, false
}
}
if lease == nil {
klog.Error(fmt.Errorf("nil lease object provided"))
return nil, false
}
if h.identityLeaseLabelSelector != nil && h.identityLeaseLabelSelector.String() != "" {
identityLeaseLabel := strings.Split(h.identityLeaseLabelSelector.String(), "=")
if len(identityLeaseLabel) != 2 {
klog.Errorf("invalid identityLeaseLabelSelector format: %s", h.identityLeaseLabelSelector.String())
return nil, false
}
if lease.Labels == nil || lease.Labels[identityLeaseLabel[0]] != identityLeaseLabel[1] {
klog.V(4).Infof("lease %s/%s does not match label selector: %s=%s", lease.Namespace, lease.Name, identityLeaseLabel[0], identityLeaseLabel[1])
return nil, false
}
}
// Ignore self.
if lease.Name == h.serverID {
return nil, false
}
if lease.Spec.HolderIdentity == nil {
klog.Error(fmt.Errorf("invalid lease object provided, missing holderIdentity in lease obj"))
return nil, false
}
return lease, true
} }

View File

@ -25,26 +25,30 @@ import (
"net/http" "net/http"
"net/url" "net/url"
"strconv" "strconv"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
"k8s.io/api/apiserverinternal/v1alpha1" "k8s.io/apimachinery/pkg/labels"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
schema "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/proxy" "k8s.io/apimachinery/pkg/util/proxy"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
epmetrics "k8s.io/apiserver/pkg/endpoints/metrics"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/endpoints/responsewriter" "k8s.io/apiserver/pkg/endpoints/responsewriter"
"k8s.io/apiserver/pkg/reconcilers" "k8s.io/apiserver/pkg/reconcilers"
"k8s.io/apiserver/pkg/storageversion"
"k8s.io/apiserver/pkg/util/peerproxy/metrics" "k8s.io/apiserver/pkg/util/peerproxy/metrics"
apiserverproxyutil "k8s.io/apiserver/pkg/util/proxy" "k8s.io/client-go/discovery"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/transport" "k8s.io/client-go/transport"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2" "k8s.io/klog/v2"
apierrors "k8s.io/apimachinery/pkg/api/errors"
schema "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
epmetrics "k8s.io/apiserver/pkg/endpoints/metrics"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
apiserverproxyutil "k8s.io/apiserver/pkg/util/proxy"
coordinationv1informers "k8s.io/client-go/informers/coordination/v1"
) )
const ( const (
@ -53,33 +57,34 @@ const (
type peerProxyHandler struct { type peerProxyHandler struct {
name string name string
// StorageVersion informer used to fetch apiserver ids than can serve a resource // Identity for this server.
storageversionInformer cache.SharedIndexInformer serverID string
// StorageVersion manager used to ensure it has finished updating storageversions before
// we start handling external requests
storageversionManager storageversion.Manager
// proxy transport
proxyTransport http.RoundTripper
// identity for this server
serverId string
// reconciler that is used to fetch host port of peer apiserver when proxying request to a peer
reconciler reconcilers.PeerEndpointLeaseReconciler
serializer runtime.NegotiatedSerializer
// SyncMap for storing an up to date copy of the storageversions and apiservers that can serve them
// This map is populated using the StorageVersion informer
// This map has key set to GVR and value being another SyncMap
// The nested SyncMap has key set to apiserver id and value set to boolean
// The nested maps are created to have a "Set" like structure to store unique apiserver ids
// for a given GVR
svMap sync.Map
finishedSync atomic.Bool finishedSync atomic.Bool
// Label to check against in identity leases to make sure
// we are working with apiserver identity leases only.
identityLeaseLabelSelector labels.Selector
apiserverIdentityInformer coordinationv1informers.LeaseInformer
leaseRegistration cache.ResourceEventHandlerRegistration
// Reconciler that is used to fetch host port of peer apiserver when proxying request to a peer.
reconciler reconcilers.PeerEndpointLeaseReconciler
// Client to make discovery calls locally.
discoveryClient *discovery.DiscoveryClient
discoverySerializer serializer.CodecFactory
// Cache that stores resources served by this apiserver. Refreshed periodically.
// We always look up in the local discovery cache first, to check whether the
// request can be served by this apiserver instead of proxying it to a peer.
localDiscoveryInfoCache atomic.Value
localDiscoveryCacheTicker *time.Ticker
localDiscoveryInfoCachePopulated chan struct{}
localDiscoveryInfoCachePopulatedOnce sync.Once
// Cache that stores resources served by peer apiservers.
// Refreshed if a new apiserver identity lease is added, deleted or
// holderIndentity change is observed in the lease.
peerDiscoveryInfoCache atomic.Value
proxyTransport http.RoundTripper
// Worker queue that keeps the peerDiscoveryInfoCache up-to-date.
peerLeaseQueue workqueue.TypedRateLimitingInterface[string]
serializer runtime.NegotiatedSerializer
} }
// responder implements rest.Responder for assisting a connector in writing objects or errors. // responder implements rest.Responder for assisting a connector in writing objects or errors.
@ -93,12 +98,22 @@ func (h *peerProxyHandler) HasFinishedSync() bool {
} }
func (h *peerProxyHandler) WaitForCacheSync(stopCh <-chan struct{}) error { func (h *peerProxyHandler) WaitForCacheSync(stopCh <-chan struct{}) error {
ok := cache.WaitForNamedCacheSync("mixed-version-proxy", stopCh, h.apiserverIdentityInformer.Informer().HasSynced)
ok := cache.WaitForNamedCacheSync("unknown-version-proxy", stopCh, h.storageversionInformer.HasSynced, h.storageversionManager.Completed)
if !ok { if !ok {
return fmt.Errorf("error while waiting for initial cache sync") return fmt.Errorf("error while waiting for initial cache sync")
} }
klog.V(3).Infof("setting finishedSync to true")
if !cache.WaitForNamedCacheSync(controllerName, stopCh, h.leaseRegistration.HasSynced) {
return fmt.Errorf("error while waiting for peer-identity-lease event handler registration sync")
}
// Wait for localDiscoveryInfoCache to be populated.
select {
case <-h.localDiscoveryInfoCachePopulated:
case <-stopCh:
return fmt.Errorf("stop signal received while waiting for local discovery cache population")
}
h.finishedSync.Store(true) h.finishedSync.Store(true)
return nil return nil
} }
@ -109,7 +124,6 @@ func (h *peerProxyHandler) WrapHandler(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context() ctx := r.Context()
requestInfo, ok := apirequest.RequestInfoFrom(ctx) requestInfo, ok := apirequest.RequestInfoFrom(ctx)
if !ok { if !ok {
responsewriters.InternalError(w, r, errors.New("no RequestInfo found in the context")) responsewriters.InternalError(w, r, errors.New("no RequestInfo found in the context"))
return return
@ -129,10 +143,9 @@ func (h *peerProxyHandler) WrapHandler(handler http.Handler) http.Handler {
return return
} }
// StorageVersion Informers and/or StorageVersionManager is not synced yet, pass request to next handler // Apiserver Identity Informers is not synced yet, pass request to next handler
// This will happen for self requests from the kube-apiserver because we have a poststarthook // This will happen for self requests from the kube-apiserver because we have a poststarthook
// to ensure that external requests are not served until the StorageVersion Informer and // to ensure that external requests are not served until the ApiserverIdentity Informer has synced
// StorageVersionManager has synced
if !h.HasFinishedSync() { if !h.HasFinishedSync() {
handler.ServeHTTP(w, r) handler.ServeHTTP(w, r)
return return
@ -143,15 +156,20 @@ func (h *peerProxyHandler) WrapHandler(handler http.Handler) http.Handler {
gvr.Group = "core" gvr.Group = "core"
} }
apiservers, err := h.findServiceableByServers(gvr) if h.shouldServeLocally(gvr) {
if err != nil {
// resource wasn't found in SV informer cache which means that resource is an aggregated API
// or a CR. This situation is ok to be handled by local handler.
handler.ServeHTTP(w, r) handler.ServeHTTP(w, r)
return return
} }
locallyServiceable, peerEndpoints, err := h.resolveServingLocation(apiservers) // find servers that are capable of serving this request
peerServerIDs := h.findServiceableByPeerFromPeerDiscoveryCache(gvr)
if len(peerServerIDs) == 0 {
klog.Errorf("gvr %v is not served by anything in this cluster", gvr)
handler.ServeHTTP(w, r)
return
}
peerEndpoints, err := h.resolveServingLocation(peerServerIDs)
if err != nil { if err != nil {
gv := schema.GroupVersion{Group: gvr.Group, Version: gvr.Version} gv := schema.GroupVersion{Group: gvr.Group, Version: gvr.Version}
klog.ErrorS(err, "error finding serviceable-by apiservers for the requested resource", "gvr", gvr) klog.ErrorS(err, "error finding serviceable-by apiservers for the requested resource", "gvr", gvr)
@ -159,91 +177,145 @@ func (h *peerProxyHandler) WrapHandler(handler http.Handler) http.Handler {
return return
} }
// pass request to the next handler if found the gvr locally.
// TODO: maintain locally serviceable GVRs somewhere so that we dont have to
// consult the storageversion-informed map for those
if locallyServiceable {
handler.ServeHTTP(w, r)
return
}
if len(peerEndpoints) == 0 {
klog.Errorf("gvr %v is not served by anything in this cluster", gvr)
handler.ServeHTTP(w, r)
return
}
// otherwise, randomly select an apiserver and proxy request to it
rand := rand.Intn(len(peerEndpoints)) rand := rand.Intn(len(peerEndpoints))
destServerHostPort := peerEndpoints[rand] peerEndpoint := peerEndpoints[rand]
h.proxyRequestToDestinationAPIServer(r, w, destServerHostPort) h.proxyRequestToDestinationAPIServer(r, w, peerEndpoint)
}) })
} }
func (h *peerProxyHandler) findServiceableByServers(gvr schema.GroupVersionResource) (*sync.Map, error) { // RunLocalDiscoveryCacheSync populated the localDiscoveryInfoCache and
apiserversi, ok := h.svMap.Load(gvr) // starts a goroutine to periodically refresh the local discovery cache.
if !ok || apiserversi == nil { func (h *peerProxyHandler) RunLocalDiscoveryCacheSync(stopCh <-chan struct{}) error {
return nil, fmt.Errorf("no storageVersions found for the GVR: %v", gvr) klog.Info("localDiscoveryCacheInvalidation goroutine started")
// Populate the cache initially.
if err := h.populateLocalDiscoveryCache(); err != nil {
return fmt.Errorf("failed to populate initial local discovery cache: %w", err)
} }
apiservers, _ := apiserversi.(*sync.Map) go func() {
return apiservers, nil for {
select {
case <-h.localDiscoveryCacheTicker.C:
klog.V(4).Infof("Invalidating local discovery cache")
if err := h.populateLocalDiscoveryCache(); err != nil {
klog.Errorf("Failed to repopulate local discovery cache: %v", err)
}
case <-stopCh:
klog.Info("localDiscoveryCacheInvalidation goroutine received stop signal")
if h.localDiscoveryCacheTicker != nil {
h.localDiscoveryCacheTicker.Stop()
klog.Info("localDiscoveryCacheTicker stopped")
}
klog.Info("localDiscoveryCacheInvalidation goroutine exiting")
return
}
}
}()
return nil
} }
func (h *peerProxyHandler) resolveServingLocation(apiservers *sync.Map) (bool, []string, error) { func (h *peerProxyHandler) populateLocalDiscoveryCache() error {
var peerServerEndpoints []string _, resourcesByGV, _, err := h.discoveryClient.GroupsAndMaybeResources()
var locallyServiceable bool if err != nil {
var respErr error return fmt.Errorf("error getting API group resources from discovery: %w", err)
}
apiservers.Range(func(key, value interface{}) bool { freshLocalDiscoveryResponse := map[schema.GroupVersionResource]bool{}
apiserverKey := key.(string) for gv, resources := range resourcesByGV {
if apiserverKey == h.serverId { if gv.Group == "" {
locallyServiceable = true gv.Group = "core"
// stop iteration and reset any errors encountered so far. }
respErr = nil for _, resource := range resources.APIResources {
gvr := gv.WithResource(resource.Name)
freshLocalDiscoveryResponse[gvr] = true
}
}
h.localDiscoveryInfoCache.Store(freshLocalDiscoveryResponse)
// Signal that the cache has been populated.
h.localDiscoveryInfoCachePopulatedOnce.Do(func() {
close(h.localDiscoveryInfoCachePopulated)
})
return nil
}
// shouldServeLocally checks if the requested resource is present in the local
// discovery cache indicating the request can be served by this server.
func (h *peerProxyHandler) shouldServeLocally(gvr schema.GroupVersionResource) bool {
cache := h.localDiscoveryInfoCache.Load().(map[schema.GroupVersionResource]bool)
exists, ok := cache[gvr]
if !ok {
klog.V(4).Infof("resource not found for %v in local discovery cache\n", gvr.GroupVersion())
return false return false
} }
hostPort, err := h.hostportInfo(apiserverKey) if exists {
if err != nil {
respErr = err
// continue with iteration
return true return true
} }
return false
}
func (h *peerProxyHandler) findServiceableByPeerFromPeerDiscoveryCache(gvr schema.GroupVersionResource) []string {
var serviceableByIDs []string
cache := h.peerDiscoveryInfoCache.Load().(map[string]map[schema.GroupVersionResource]bool)
for peerID, servedResources := range cache {
// Ignore local apiserver.
if peerID == h.serverID {
continue
}
exists, ok := servedResources[gvr]
if !ok {
continue
}
if exists {
serviceableByIDs = append(serviceableByIDs, peerID)
}
}
return serviceableByIDs
}
// resolveServingLocation resolves the host:port addresses for the given peer IDs.
func (h *peerProxyHandler) resolveServingLocation(peerIDs []string) ([]string, error) {
var peerServerEndpoints []string
var errs []error
for _, id := range peerIDs {
hostPort, err := h.hostportInfo(id)
if err != nil {
errs = append(errs, err)
continue
}
peerServerEndpoints = append(peerServerEndpoints, hostPort) peerServerEndpoints = append(peerServerEndpoints, hostPort)
return true }
})
// reset err if there was atleast one valid peer server found. // reset err if there was atleast one valid peer server found.
if len(peerServerEndpoints) > 0 { if len(peerServerEndpoints) > 0 {
respErr = nil errs = nil
} }
return locallyServiceable, peerServerEndpoints, respErr return peerServerEndpoints, errors.Join(errs...)
} }
func (h *peerProxyHandler) hostportInfo(apiserverKey string) (string, error) { func (h *peerProxyHandler) hostportInfo(apiserverKey string) (string, error) {
hostport, err := h.reconciler.GetEndpoint(apiserverKey) hostPort, err := h.reconciler.GetEndpoint(apiserverKey)
if err != nil {
return "", err
}
// check ip format
_, _, err = net.SplitHostPort(hostport)
if err != nil { if err != nil {
return "", err return "", err
} }
return hostport, nil _, _, err = net.SplitHostPort(hostPort)
if err != nil {
return "", err
}
return hostPort, nil
} }
func (h *peerProxyHandler) proxyRequestToDestinationAPIServer(req *http.Request, rw http.ResponseWriter, host string) { func (h *peerProxyHandler) proxyRequestToDestinationAPIServer(req *http.Request, rw http.ResponseWriter, host string) {
user, ok := apirequest.UserFrom(req.Context())
if !ok {
klog.Error("failed to get user info from request")
return
}
// write a new location based on the existing request pointed at the target service // write a new location based on the existing request pointed at the target service
location := &url.URL{} location := &url.URL{}
location.Scheme = "https" location.Scheme = "https"
@ -255,107 +327,29 @@ func (h *peerProxyHandler) proxyRequestToDestinationAPIServer(req *http.Request,
newReq.Header.Add(PeerProxiedHeader, "true") newReq.Header.Add(PeerProxiedHeader, "true")
defer cancelFn() defer cancelFn()
proxyRoundTripper := transport.NewAuthProxyRoundTripper(user.GetName(), user.GetUID(), user.GetGroups(), user.GetExtra(), h.proxyTransport) proxyRoundTripper, err := h.buildProxyRoundtripper(req)
if err != nil {
klog.Errorf("failed to build proxy round tripper: %v", err)
return
}
delegate := &epmetrics.ResponseWriterDelegator{ResponseWriter: rw} delegate := &epmetrics.ResponseWriterDelegator{ResponseWriter: rw}
w := responsewriter.WrapForHTTP1Or2(delegate) w := responsewriter.WrapForHTTP1Or2(delegate)
handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, false, &responder{w: w, ctx: req.Context()}) handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, false, &responder{w: w, ctx: req.Context()})
handler.ServeHTTP(w, newReq) handler.ServeHTTP(w, newReq)
metrics.IncPeerProxiedRequest(req.Context(), strconv.Itoa(delegate.Status())) metrics.IncPeerProxiedRequest(req.Context(), strconv.Itoa(delegate.Status()))
} }
func (h *peerProxyHandler) buildProxyRoundtripper(req *http.Request) (http.RoundTripper, error) {
user, ok := apirequest.UserFrom(req.Context())
if !ok {
return nil, apierrors.NewBadRequest("no user details present in request")
}
return transport.NewAuthProxyRoundTripper(user.GetName(), user.GetUID(), user.GetGroups(), user.GetExtra(), h.proxyTransport), nil
}
func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) { func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) {
klog.ErrorS(err, "Error while proxying request to destination apiserver") klog.ErrorS(err, "Error while proxying request to destination apiserver")
http.Error(w, err.Error(), http.StatusServiceUnavailable) http.Error(w, err.Error(), http.StatusServiceUnavailable)
} }
// Adds a storageversion object to SVMap
func (h *peerProxyHandler) addSV(obj interface{}) {
sv, ok := obj.(*v1alpha1.StorageVersion)
if !ok {
klog.Error("Invalid StorageVersion provided to addSV()")
return
}
h.updateSVMap(nil, sv)
}
// Updates the SVMap to delete old storageversion and add new storageversion
func (h *peerProxyHandler) updateSV(oldObj interface{}, newObj interface{}) {
oldSV, ok := oldObj.(*v1alpha1.StorageVersion)
if !ok {
klog.Error("Invalid StorageVersion provided to updateSV()")
return
}
newSV, ok := newObj.(*v1alpha1.StorageVersion)
if !ok {
klog.Error("Invalid StorageVersion provided to updateSV()")
return
}
h.updateSVMap(oldSV, newSV)
}
// Deletes a storageversion object from SVMap
func (h *peerProxyHandler) deleteSV(obj interface{}) {
sv, ok := obj.(*v1alpha1.StorageVersion)
if !ok {
klog.Error("Invalid StorageVersion provided to deleteSV()")
return
}
h.updateSVMap(sv, nil)
}
// Delete old storageversion, add new storagversion
func (h *peerProxyHandler) updateSVMap(oldSV *v1alpha1.StorageVersion, newSV *v1alpha1.StorageVersion) {
if oldSV != nil {
h.deleteSVFromMap(oldSV)
}
if newSV != nil {
h.addSVToMap(newSV)
}
}
func (h *peerProxyHandler) deleteSVFromMap(sv *v1alpha1.StorageVersion) {
// The name of storageversion is <group>.<resource>
splitInd := strings.LastIndex(sv.Name, ".")
group := sv.Name[:splitInd]
resource := sv.Name[splitInd+1:]
gvr := schema.GroupVersionResource{Group: group, Resource: resource}
for _, gr := range sv.Status.StorageVersions {
for _, version := range gr.ServedVersions {
versionSplit := strings.Split(version, "/")
if len(versionSplit) == 2 {
version = versionSplit[1]
}
gvr.Version = version
h.svMap.Delete(gvr)
}
}
}
func (h *peerProxyHandler) addSVToMap(sv *v1alpha1.StorageVersion) {
// The name of storageversion is <group>.<resource>
splitInd := strings.LastIndex(sv.Name, ".")
group := sv.Name[:splitInd]
resource := sv.Name[splitInd+1:]
gvr := schema.GroupVersionResource{Group: group, Resource: resource}
for _, gr := range sv.Status.StorageVersions {
for _, version := range gr.ServedVersions {
// some versions have groups included in them, so get rid of the groups
versionSplit := strings.Split(version, "/")
if len(versionSplit) == 2 {
version = versionSplit[1]
}
gvr.Version = version
apiserversi, _ := h.svMap.LoadOrStore(gvr, &sync.Map{})
apiservers := apiserversi.(*sync.Map)
apiservers.Store(gr.APIServerID, true)
}
}
}

View File

@ -19,7 +19,6 @@ package peerproxy
import ( import (
"net/http" "net/http"
"strings" "strings"
"sync"
"testing" "testing"
"time" "time"
@ -27,42 +26,39 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/apitesting" "k8s.io/apimachinery/pkg/api/apitesting"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/runtime/serializer"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/authentication/user"
apifilters "k8s.io/apiserver/pkg/endpoints/filters"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/reconcilers" "k8s.io/apiserver/pkg/reconcilers"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
"k8s.io/apiserver/pkg/storageversion"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/peerproxy/metrics" "k8s.io/apiserver/pkg/util/peerproxy/metrics"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
"k8s.io/client-go/transport" "k8s.io/client-go/transport"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/component-base/metrics/legacyregistry" "k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/metrics/testutil" "k8s.io/component-base/metrics/testutil"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
apifilters "k8s.io/apiserver/pkg/endpoints/filters"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
) )
const ( const (
requestTimeout = 30 * time.Second requestTimeout = 30 * time.Second
localServerID = "local-apiserver" localServerID = "local-apiserver"
remoteServerID = "remote-apiserver" remoteServerID1 = "remote-apiserver-1"
remoteServerID2 = "remote-apiserver-2"
) )
type FakeSVMapData struct {
gvr schema.GroupVersionResource
serverIDs []string
}
type server struct { type server struct {
publicIP string publicIP string
serverID string serverID string
@ -76,205 +72,138 @@ type reconciler struct {
func TestPeerProxy(t *testing.T) { func TestPeerProxy(t *testing.T) {
testCases := []struct { testCases := []struct {
desc string desc string
svdata FakeSVMapData
informerFinishedSync bool informerFinishedSync bool
requestPath string requestPath string
peerproxiedHeader string peerproxiedHeader string
expectedStatus int
metrics []string
want string
reconcilerConfig reconciler reconcilerConfig reconciler
localCache map[schema.GroupVersionResource]bool
peerCache map[string]map[schema.GroupVersionResource]bool
wantStatus int
wantMetricsData string
}{ }{
{ {
desc: "allow non resource requests", desc: "allow non resource requests",
requestPath: "/foo/bar/baz", requestPath: "/foo/bar/baz",
expectedStatus: http.StatusOK, wantStatus: http.StatusOK,
}, },
{ {
desc: "allow if already proxied once", desc: "allow if already proxied once",
requestPath: "/api/bar/baz", requestPath: "/api/bar/baz",
expectedStatus: http.StatusOK,
peerproxiedHeader: "true", peerproxiedHeader: "true",
wantStatus: http.StatusOK,
}, },
{ {
desc: "allow if unsynced informers", desc: "allow if unsynced informers",
requestPath: "/api/bar/baz", requestPath: "/api/bar/baz",
expectedStatus: http.StatusOK,
informerFinishedSync: false, informerFinishedSync: false,
wantStatus: http.StatusOK,
}, },
{ {
desc: "allow if no storage version found", desc: "Serve locally if serviceable",
requestPath: "/api/bar/baz", requestPath: "/api/foo/bar",
expectedStatus: http.StatusOK, localCache: map[schema.GroupVersionResource]bool{
informerFinishedSync: true, {Group: "core", Version: "foo", Resource: "bar"}: true,
},
wantStatus: http.StatusOK,
}, },
{ {
// since if no server id is found, we pass request to next handler desc: "200 if no appropriate peers found, serve locally",
//, and our last handler in local chain is an http ok handler requestPath: "/api/foo/bar",
desc: "200 if no serverid found",
requestPath: "/api/bar/baz",
expectedStatus: http.StatusOK,
informerFinishedSync: true, informerFinishedSync: true,
svdata: FakeSVMapData{ wantStatus: http.StatusOK,
gvr: schema.GroupVersionResource{
Group: "core",
Version: "bar",
Resource: "baz"},
serverIDs: []string{}},
}, },
{ {
desc: "503 if no endpoint fetched from lease", desc: "503 if no endpoint fetched from lease",
requestPath: "/api/foo/bar", requestPath: "/api/foo/bar",
expectedStatus: http.StatusServiceUnavailable,
informerFinishedSync: true, informerFinishedSync: true,
svdata: FakeSVMapData{ peerCache: map[string]map[schema.GroupVersionResource]bool{
gvr: schema.GroupVersionResource{ remoteServerID1: {
Group: "core", {Group: "core", Version: "foo", Resource: "bar"}: true,
Version: "foo",
Resource: "bar"},
serverIDs: []string{remoteServerID}},
}, },
{ },
desc: "200 if locally serviceable", wantStatus: http.StatusServiceUnavailable,
requestPath: "/api/foo/bar",
expectedStatus: http.StatusOK,
informerFinishedSync: true,
svdata: FakeSVMapData{
gvr: schema.GroupVersionResource{
Group: "core",
Version: "foo",
Resource: "bar"},
serverIDs: []string{localServerID}},
}, },
{ {
desc: "503 unreachable peer bind address", desc: "503 unreachable peer bind address",
requestPath: "/api/foo/bar", requestPath: "/api/foo/bar",
expectedStatus: http.StatusServiceUnavailable,
informerFinishedSync: true, informerFinishedSync: true,
svdata: FakeSVMapData{ peerCache: map[string]map[schema.GroupVersionResource]bool{
gvr: schema.GroupVersionResource{ remoteServerID1: {
Group: "core", {Group: "core", Version: "foo", Resource: "bar"}: true,
Version: "foo", },
Resource: "bar"}, },
serverIDs: []string{remoteServerID}},
reconcilerConfig: reconciler{ reconcilerConfig: reconciler{
do: true, do: true,
servers: []server{ servers: []server{
{ {
publicIP: "1.2.3.4", publicIP: "1.2.3.4",
serverID: remoteServerID, serverID: remoteServerID1,
}, },
}, },
}, },
metrics: []string{ wantStatus: http.StatusServiceUnavailable,
"apiserver_rerouted_request_total", wantMetricsData: `
},
want: `
# HELP apiserver_rerouted_request_total [ALPHA] Total number of requests that were proxied to a peer kube apiserver because the local apiserver was not capable of serving it # HELP apiserver_rerouted_request_total [ALPHA] Total number of requests that were proxied to a peer kube apiserver because the local apiserver was not capable of serving it
# TYPE apiserver_rerouted_request_total counter # TYPE apiserver_rerouted_request_total counter
apiserver_rerouted_request_total{code="503"} 1 apiserver_rerouted_request_total{code="503"} 1
`, `,
}, },
{
desc: "503 unreachable peer public address",
requestPath: "/api/foo/bar",
expectedStatus: http.StatusServiceUnavailable,
informerFinishedSync: true,
svdata: FakeSVMapData{
gvr: schema.GroupVersionResource{
Group: "core",
Version: "foo",
Resource: "bar"},
serverIDs: []string{remoteServerID}},
reconcilerConfig: reconciler{
do: true,
servers: []server{
{
publicIP: "1.2.3.4",
serverID: remoteServerID,
},
},
},
metrics: []string{
"apiserver_rerouted_request_total",
},
want: `
# HELP apiserver_rerouted_request_total [ALPHA] Total number of requests that were proxied to a peer kube apiserver because the local apiserver was not capable of serving it
# TYPE apiserver_rerouted_request_total counter
apiserver_rerouted_request_total{code="503"} 2
`,
},
{ {
desc: "503 if one apiserver's endpoint lease wasnt found but another valid (unreachable) apiserver was found", desc: "503 if one apiserver's endpoint lease wasnt found but another valid (unreachable) apiserver was found",
requestPath: "/api/foo/bar", requestPath: "/api/foo/bar",
expectedStatus: http.StatusServiceUnavailable,
informerFinishedSync: true, informerFinishedSync: true,
svdata: FakeSVMapData{ peerCache: map[string]map[schema.GroupVersionResource]bool{
gvr: schema.GroupVersionResource{ remoteServerID1: {
Group: "core", {Group: "core", Version: "foo", Resource: "bar"}: true,
Version: "foo", },
Resource: "bar"}, remoteServerID2: {
serverIDs: []string{"aggregated-apiserver", remoteServerID}}, {Group: "core", Version: "foo", Resource: "bar"}: true,
},
},
reconcilerConfig: reconciler{ reconcilerConfig: reconciler{
do: true, do: true,
servers: []server{ servers: []server{
{ {
publicIP: "1.2.3.4", publicIP: "1.2.3.4",
serverID: remoteServerID, serverID: remoteServerID1,
},
},
},
},
{
desc: "503 if all peers had invalid host:port info",
requestPath: "/api/foo/bar",
expectedStatus: http.StatusServiceUnavailable,
informerFinishedSync: true,
svdata: FakeSVMapData{
gvr: schema.GroupVersionResource{
Group: "core",
Version: "foo",
Resource: "bar"},
serverIDs: []string{"aggregated-apiserver", remoteServerID}},
reconcilerConfig: reconciler{
do: true,
servers: []server{
{
publicIP: "1[2.4",
serverID: "aggregated-apiserver",
},
{
publicIP: "2.4]6",
serverID: remoteServerID,
}, },
}, },
}, },
wantStatus: http.StatusServiceUnavailable,
wantMetricsData: `
# HELP apiserver_rerouted_request_total [ALPHA] Total number of requests that were proxied to a peer kube apiserver because the local apiserver was not capable of serving it
# TYPE apiserver_rerouted_request_total counter
apiserver_rerouted_request_total{code="503"} 1
`,
}, },
} }
metrics.Register() metrics.Register()
for _, tt := range testCases { for _, tt := range testCases {
t.Run(tt.desc, func(t *testing.T) { t.Run(tt.desc, func(t *testing.T) {
defer metrics.Reset()
lastHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { lastHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("OK")) w.Write([]byte("OK"))
}) })
reconciler := newFakePeerEndpointReconciler(t) serverIDs := []string{localServerID}
handler := newHandlerChain(t, lastHandler, reconciler, tt.informerFinishedSync, tt.svdata) for peerID := range tt.peerCache {
serverIDs = append(serverIDs, peerID)
}
fakeReconciler := newFakePeerEndpointReconciler(t)
handler := newHandlerChain(t, tt.informerFinishedSync, lastHandler, fakeReconciler, tt.localCache, tt.peerCache)
server, requestGetter := createHTTP2ServerWithClient(handler, requestTimeout*2) server, requestGetter := createHTTP2ServerWithClient(handler, requestTimeout*2)
defer server.Close() defer server.Close()
if tt.reconcilerConfig.do { if tt.reconcilerConfig.do {
// need to enable feature flags first
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true) featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true)
for _, server := range tt.reconcilerConfig.servers { for _, s := range tt.reconcilerConfig.servers {
err := reconciler.UpdateLease(server.serverID, err := fakeReconciler.UpdateLease(s.serverID,
server.publicIP, s.publicIP,
[]corev1.EndpointPort{{Name: "foo", []corev1.EndpointPort{{Name: "foo",
Port: 8080, Protocol: "TCP"}}) Port: 8080, Protocol: "TCP"}})
if err != nil { if err != nil {
t.Fatalf("failed to update peer endpoint lease - %v", err) t.Errorf("Failed to update lease for server %s", s.serverID)
} }
} }
} }
@ -285,17 +214,14 @@ func TestPeerProxy(t *testing.T) {
} }
req.Header.Set(PeerProxiedHeader, tt.peerproxiedHeader) req.Header.Set(PeerProxiedHeader, tt.peerproxiedHeader)
resp, err := requestGetter(req) resp, _ := requestGetter(req)
if err != nil {
t.Fatalf("unexpected error trying to get the request: %v", err)
}
// compare response // compare response
assert.Equal(t, tt.expectedStatus, resp.StatusCode) assert.Equal(t, tt.wantStatus, resp.StatusCode)
// compare metric // compare metric
if tt.want != "" { if tt.wantMetricsData != "" {
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(tt.want), tt.metrics...); err != nil { if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(tt.wantMetricsData), []string{"apiserver_rerouted_request_total"}...); err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
@ -324,10 +250,12 @@ func newFakePeerEndpointReconciler(t *testing.T) reconcilers.PeerEndpointLeaseRe
return reconciler return reconciler
} }
func newHandlerChain(t *testing.T, handler http.Handler, reconciler reconcilers.PeerEndpointLeaseReconciler, informerFinishedSync bool, svdata FakeSVMapData) http.Handler { func newHandlerChain(t *testing.T, informerFinishedSync bool, handler http.Handler,
reconciler reconcilers.PeerEndpointLeaseReconciler,
localCache map[schema.GroupVersionResource]bool, peerCache map[string]map[schema.GroupVersionResource]bool) http.Handler {
// Add peerproxy handler // Add peerproxy handler
s := serializer.NewCodecFactory(runtime.NewScheme()).WithoutConversion() s := serializer.NewCodecFactory(runtime.NewScheme()).WithoutConversion()
peerProxyHandler, err := newFakePeerProxyHandler(reconciler, svdata, localServerID, s) peerProxyHandler, err := newFakePeerProxyHandler(informerFinishedSync, reconciler, localServerID, s, localCache, peerCache)
if err != nil { if err != nil {
t.Fatalf("Error creating peer proxy handler: %v", err) t.Fatalf("Error creating peer proxy handler: %v", err)
} }
@ -343,36 +271,28 @@ func newHandlerChain(t *testing.T, handler http.Handler, reconciler reconcilers.
return handler return handler
} }
func newFakePeerProxyHandler(reconciler reconcilers.PeerEndpointLeaseReconciler, svdata FakeSVMapData, id string, s runtime.NegotiatedSerializer) (*peerProxyHandler, error) { func newFakePeerProxyHandler(informerFinishedSync bool,
reconciler reconcilers.PeerEndpointLeaseReconciler, id string, s runtime.NegotiatedSerializer,
localCache map[schema.GroupVersionResource]bool, peerCache map[string]map[schema.GroupVersionResource]bool) (*peerProxyHandler, error) {
clientset := fake.NewSimpleClientset() clientset := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(clientset, 0) informerFactory := informers.NewSharedInformerFactory(clientset, 0)
leaseInformer := informerFactory.Coordination().V1().Leases()
clientConfig := &transport.Config{ clientConfig := &transport.Config{
TLS: transport.TLSConfig{ TLS: transport.TLSConfig{
Insecure: false, Insecure: false,
}} }}
proxyRoundTripper, err := transport.New(clientConfig) loopbackClientConfig := &rest.Config{
Host: "localhost:1010",
}
ppH, err := NewPeerProxyHandler(id, "identity=testserver", leaseInformer, reconciler, s, loopbackClientConfig, clientConfig)
if err != nil { if err != nil {
return nil, err return nil, err
} }
ppI := NewPeerProxyHandler(informerFactory, storageversion.NewDefaultManager(), proxyRoundTripper, id, reconciler, s) ppH.localDiscoveryInfoCache.Store(localCache)
if testDataExists(svdata.gvr) { ppH.peerDiscoveryInfoCache.Store(peerCache)
ppI.addToStorageVersionMap(svdata.gvr, svdata.serverIDs)
}
return ppI, nil
}
func (h *peerProxyHandler) addToStorageVersionMap(gvr schema.GroupVersionResource, serverIDs []string) { ppH.finishedSync.Store(informerFinishedSync)
apiserversi, _ := h.svMap.LoadOrStore(gvr, &sync.Map{}) return ppH, nil
apiservers := apiserversi.(*sync.Map)
for _, serverID := range serverIDs {
if serverID != "" {
apiservers.Store(serverID, true)
}
}
}
func testDataExists(gvr schema.GroupVersionResource) bool {
return gvr.Group != "" && gvr.Version != "" && gvr.Resource != ""
} }
func withFakeUser(handler http.Handler) http.Handler { func withFakeUser(handler http.Handler) http.Handler {