allow combining API servers
Kubernetes-commit: bccef75d7ae43be333e068748bb6f998dafa6d9d
This commit is contained in:
parent
4620c09303
commit
43ba6dde7a
|
@ -376,6 +376,15 @@ func (c *Config) SkipComplete() completedConfig {
|
|||
// auth, then the caller should create a handler for those endpoints, which delegates the
|
||||
// any unhandled paths to "Handler".
|
||||
func (c completedConfig) New() (*GenericAPIServer, error) {
|
||||
s, err := c.constructServer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return c.buildHandlers(s, nil)
|
||||
}
|
||||
|
||||
func (c completedConfig) constructServer() (*GenericAPIServer, error) {
|
||||
if c.Serializer == nil {
|
||||
return nil, fmt.Errorf("Genericapiserver.New() called with config.Serializer == nil")
|
||||
}
|
||||
|
@ -383,6 +392,8 @@ func (c completedConfig) New() (*GenericAPIServer, error) {
|
|||
return nil, fmt.Errorf("Genericapiserver.New() called with config.LoopbackClientConfig == nil")
|
||||
}
|
||||
|
||||
handlerContainer := mux.NewAPIContainer(http.NewServeMux(), c.Serializer, c.FallThroughHandler)
|
||||
|
||||
s := &GenericAPIServer{
|
||||
discoveryAddresses: c.DiscoveryAddresses,
|
||||
LoopbackClientConfig: c.LoopbackClientConfig,
|
||||
|
@ -399,8 +410,11 @@ func (c completedConfig) New() (*GenericAPIServer, error) {
|
|||
|
||||
apiGroupsForDiscovery: map[string]metav1.APIGroup{},
|
||||
|
||||
HandlerContainer: handlerContainer,
|
||||
FallThroughHandler: c.FallThroughHandler,
|
||||
|
||||
listedPathProvider: routes.ListedPathProviders{handlerContainer, c.FallThroughHandler},
|
||||
|
||||
swaggerConfig: c.SwaggerConfig,
|
||||
openAPIConfig: c.OpenAPIConfig,
|
||||
|
||||
|
@ -408,8 +422,48 @@ func (c completedConfig) New() (*GenericAPIServer, error) {
|
|||
healthzChecks: c.HealthzChecks,
|
||||
}
|
||||
|
||||
s.HandlerContainer = mux.NewAPIContainer(http.NewServeMux(), c.Serializer, s.FallThroughHandler)
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// NewWithDelegate creates a new server which logically combines the handling chain with the passed server.
|
||||
func (c completedConfig) NewWithDelegate(delegationTarget DelegationTarget) (*GenericAPIServer, error) {
|
||||
// some pieces of the delegationTarget take precendence. Callers should already have ensured that these
|
||||
// were wired correctly. Documenting them here.
|
||||
// c.RequestContextMapper = delegationTarget.RequestContextMapper()
|
||||
|
||||
s, err := c.constructServer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for k, v := range delegationTarget.PostStartHooks() {
|
||||
s.postStartHooks[k] = v
|
||||
}
|
||||
|
||||
for _, delegateCheck := range delegationTarget.HealthzChecks() {
|
||||
skip := false
|
||||
for _, existingCheck := range c.HealthzChecks {
|
||||
if existingCheck.Name() == delegateCheck.Name() {
|
||||
skip = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if skip {
|
||||
continue
|
||||
}
|
||||
|
||||
s.healthzChecks = append(s.healthzChecks, delegateCheck)
|
||||
}
|
||||
|
||||
s.listedPathProvider = routes.ListedPathProviders{s.listedPathProvider, delegationTarget}
|
||||
|
||||
// use the UnprotectedHandler from the delegation target to ensure that we don't attempt to double authenticator, authorize,
|
||||
// or some other part of the filter chain in delegation cases.
|
||||
return c.buildHandlers(s, delegationTarget.UnprotectedHandler())
|
||||
}
|
||||
|
||||
// buildHandlers builds our handling chain
|
||||
func (c completedConfig) buildHandlers(s *GenericAPIServer, delegate http.Handler) (*GenericAPIServer, error) {
|
||||
if s.openAPIConfig != nil {
|
||||
if s.openAPIConfig.Info == nil {
|
||||
s.openAPIConfig.Info = &spec.Info{}
|
||||
|
@ -423,7 +477,7 @@ func (c completedConfig) New() (*GenericAPIServer, error) {
|
|||
}
|
||||
}
|
||||
|
||||
s.installAPI(c.Config)
|
||||
installAPI(s, c.Config, delegate)
|
||||
|
||||
s.Handler, s.InsecureHandler = c.BuildHandlerChainsFunc(s.HandlerContainer.ServeMux, c.Config)
|
||||
|
||||
|
@ -454,9 +508,15 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) (secure, insec
|
|||
return generic(protect(apiHandler)), generic(audit(apiHandler))
|
||||
}
|
||||
|
||||
func (s *GenericAPIServer) installAPI(c *Config) {
|
||||
if c.EnableIndex {
|
||||
routes.Index{}.Install(s.HandlerContainer, s.FallThroughHandler)
|
||||
func installAPI(s *GenericAPIServer, c *Config, delegate http.Handler) {
|
||||
switch {
|
||||
case c.EnableIndex:
|
||||
routes.Index{}.Install(s.listedPathProvider, c.FallThroughHandler, delegate)
|
||||
|
||||
case delegate != nil:
|
||||
// if we have a delegate, allow it to handle everything that's unmatched even if
|
||||
// the index is disabled.
|
||||
s.FallThroughHandler.UnlistedHandleFunc("/", delegate.ServeHTTP)
|
||||
}
|
||||
if c.SwaggerConfig != nil && c.EnableSwaggerUI {
|
||||
routes.SwaggerUI{}.Install(s.FallThroughHandler)
|
||||
|
|
|
@ -0,0 +1,152 @@
|
|||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package server
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/http/httputil"
|
||||
"testing"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
"k8s.io/apiserver/pkg/server/mux"
|
||||
"k8s.io/client-go/rest"
|
||||
)
|
||||
|
||||
func TestNewWithDelegate(t *testing.T) {
|
||||
delegateConfig := NewConfig().WithSerializer(codecs)
|
||||
delegateConfig.PublicAddress = net.ParseIP("192.168.10.4")
|
||||
delegateConfig.RequestContextMapper = genericapirequest.NewRequestContextMapper()
|
||||
delegateConfig.LegacyAPIGroupPrefixes = sets.NewString("/api")
|
||||
delegateConfig.LoopbackClientConfig = &rest.Config{}
|
||||
delegateConfig.FallThroughHandler = mux.NewPathRecorderMux()
|
||||
delegateConfig.SwaggerConfig = DefaultSwaggerConfig()
|
||||
|
||||
delegateHealthzCalled := false
|
||||
delegateConfig.HealthzChecks = append(delegateConfig.HealthzChecks, healthz.NamedCheck("delegate-health", func(r *http.Request) error {
|
||||
delegateHealthzCalled = true
|
||||
return fmt.Errorf("delegate failed healthcheck")
|
||||
}))
|
||||
|
||||
delegateConfig.FallThroughHandler.HandleFunc("/foo", func(w http.ResponseWriter, _ *http.Request) {
|
||||
w.WriteHeader(http.StatusForbidden)
|
||||
})
|
||||
|
||||
delegateServer, err := delegateConfig.SkipComplete().New()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
delegateServer.AddPostStartHook("delegate-post-start-hook", func(context PostStartHookContext) error {
|
||||
return nil
|
||||
})
|
||||
|
||||
// this wires up swagger
|
||||
delegateServer.PrepareRun()
|
||||
|
||||
wrappingConfig := NewConfig().WithSerializer(codecs)
|
||||
wrappingConfig.PublicAddress = net.ParseIP("192.168.10.4")
|
||||
wrappingConfig.RequestContextMapper = genericapirequest.NewRequestContextMapper()
|
||||
wrappingConfig.LegacyAPIGroupPrefixes = sets.NewString("/api")
|
||||
wrappingConfig.LoopbackClientConfig = &rest.Config{}
|
||||
wrappingConfig.FallThroughHandler = mux.NewPathRecorderMux()
|
||||
wrappingConfig.SwaggerConfig = DefaultSwaggerConfig()
|
||||
|
||||
wrappingHealthzCalled := false
|
||||
wrappingConfig.HealthzChecks = append(wrappingConfig.HealthzChecks, healthz.NamedCheck("wrapping-health", func(r *http.Request) error {
|
||||
wrappingHealthzCalled = true
|
||||
return fmt.Errorf("wrapping failed healthcheck")
|
||||
}))
|
||||
|
||||
wrappingConfig.FallThroughHandler.HandleFunc("/bar", func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusUnauthorized)
|
||||
})
|
||||
|
||||
wrappingServer, err := wrappingConfig.Complete().NewWithDelegate(delegateServer)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
wrappingServer.AddPostStartHook("wrapping-post-start-hook", func(context PostStartHookContext) error {
|
||||
return nil
|
||||
})
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
wrappingServer.PrepareRun()
|
||||
wrappingServer.RunPostStartHooks()
|
||||
|
||||
server := httptest.NewServer(wrappingServer.Handler)
|
||||
defer server.Close()
|
||||
|
||||
checkPath(server.URL, http.StatusOK, `{
|
||||
"paths": [
|
||||
"/apis",
|
||||
"/bar",
|
||||
"/foo",
|
||||
"/healthz",
|
||||
"/healthz/delegate-health",
|
||||
"/healthz/ping",
|
||||
"/healthz/poststarthook/delegate-post-start-hook",
|
||||
"/healthz/poststarthook/wrapping-post-start-hook",
|
||||
"/healthz/wrapping-health",
|
||||
"/swaggerapi/"
|
||||
]
|
||||
}`, t)
|
||||
checkPath(server.URL+"/healthz", http.StatusInternalServerError, `[+]ping ok
|
||||
[-]wrapping-health failed: reason withheld
|
||||
[-]delegate-health failed: reason withheld
|
||||
[+]poststarthook/delegate-post-start-hook ok
|
||||
[+]poststarthook/wrapping-post-start-hook ok
|
||||
healthz check failed
|
||||
`, t)
|
||||
|
||||
checkPath(server.URL+"/healthz/delegate-health", http.StatusInternalServerError, `internal server error: delegate failed healthcheck
|
||||
`, t)
|
||||
checkPath(server.URL+"/healthz/wrapping-health", http.StatusInternalServerError, `internal server error: wrapping failed healthcheck
|
||||
`, t)
|
||||
checkPath(server.URL+"/healthz/poststarthook/delegate-post-start-hook", http.StatusOK, `ok`, t)
|
||||
checkPath(server.URL+"/healthz/poststarthook/wrapping-post-start-hook", http.StatusOK, `ok`, t)
|
||||
checkPath(server.URL+"/foo", http.StatusForbidden, ``, t)
|
||||
checkPath(server.URL+"/bar", http.StatusUnauthorized, ``, t)
|
||||
}
|
||||
|
||||
func checkPath(url string, expectedStatusCode int, expectedBody string, t *testing.T) {
|
||||
resp, err := http.Get(url)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
dump, _ := httputil.DumpResponse(resp, true)
|
||||
t.Log(string(dump))
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if e, a := expectedBody, string(body); e != a {
|
||||
t.Errorf("%q expected %v, got %v", url, e, a)
|
||||
}
|
||||
if e, a := expectedStatusCode, resp.StatusCode; e != a {
|
||||
t.Errorf("%q expected %v, got %v", url, e, a)
|
||||
}
|
||||
}
|
|
@ -130,6 +130,9 @@ type GenericAPIServer struct {
|
|||
// It comes after all filters and the API handling
|
||||
FallThroughHandler *mux.PathRecorderMux
|
||||
|
||||
// listedPathProvider is a lister which provides the set of paths to show at /
|
||||
listedPathProvider routes.ListedPathProvider
|
||||
|
||||
// Map storing information about all groups to be exposed in discovery response.
|
||||
// The map is from name to the group.
|
||||
apiGroupsForDiscoveryLock sync.RWMutex
|
||||
|
@ -152,6 +155,39 @@ type GenericAPIServer struct {
|
|||
healthzCreated bool
|
||||
}
|
||||
|
||||
// DelegationTarget is an interface which allows for composition of API servers with top level handling that works
|
||||
// as expected.
|
||||
type DelegationTarget interface {
|
||||
// UnprotectedHandler returns a handler that is NOT protected by a normal chain
|
||||
UnprotectedHandler() http.Handler
|
||||
|
||||
// RequestContextMapper returns the existing RequestContextMapper. Because we cannot rewire all existing
|
||||
// uses of this function, this will be used in any delegating API server
|
||||
RequestContextMapper() apirequest.RequestContextMapper
|
||||
|
||||
// PostStartHooks returns the post-start hooks that need to be combined
|
||||
PostStartHooks() map[string]postStartHookEntry
|
||||
|
||||
// HealthzChecks returns the healthz checks that need to be combined
|
||||
HealthzChecks() []healthz.HealthzChecker
|
||||
|
||||
// ListedPaths returns the paths for supporting an index
|
||||
ListedPaths() []string
|
||||
}
|
||||
|
||||
func (s *GenericAPIServer) UnprotectedHandler() http.Handler {
|
||||
return s.HandlerContainer.ServeMux
|
||||
}
|
||||
func (s *GenericAPIServer) PostStartHooks() map[string]postStartHookEntry {
|
||||
return s.postStartHooks
|
||||
}
|
||||
func (s *GenericAPIServer) HealthzChecks() []healthz.HealthzChecker {
|
||||
return s.healthzChecks
|
||||
}
|
||||
func (s *GenericAPIServer) ListedPaths() []string {
|
||||
return s.listedPathProvider.ListedPaths()
|
||||
}
|
||||
|
||||
func init() {
|
||||
// Send correct mime type for .svg files.
|
||||
// TODO: remove when https://github.com/golang/go/commit/21e47d831bafb59f22b1ea8098f709677ec8ce33
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
"fmt"
|
||||
"net/http"
|
||||
rt "runtime"
|
||||
"sort"
|
||||
|
||||
"github.com/emicklei/go-restful"
|
||||
"github.com/golang/glog"
|
||||
|
@ -58,6 +59,18 @@ func NewAPIContainer(mux *http.ServeMux, s runtime.NegotiatedSerializer, default
|
|||
return &c
|
||||
}
|
||||
|
||||
// ListedPaths returns the paths of the webservices for listing on /.
|
||||
func (c *APIContainer) ListedPaths() []string {
|
||||
var handledPaths []string
|
||||
// Extract the paths handled using restful.WebService
|
||||
for _, ws := range c.RegisteredWebServices() {
|
||||
handledPaths = append(handledPaths, ws.RootPath())
|
||||
}
|
||||
sort.Strings(handledPaths)
|
||||
|
||||
return handledPaths
|
||||
}
|
||||
|
||||
//TODO: Unify with RecoverPanics?
|
||||
func logStackOnRecover(s runtime.NegotiatedSerializer, panicReason interface{}, w http.ResponseWriter) {
|
||||
var buffer bytes.Buffer
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"fmt"
|
||||
"net/http"
|
||||
"runtime/debug"
|
||||
"sort"
|
||||
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
)
|
||||
|
@ -42,9 +43,12 @@ func NewPathRecorderMux() *PathRecorderMux {
|
|||
}
|
||||
}
|
||||
|
||||
// HandledPaths returns the registered handler exposedPaths.
|
||||
func (m *PathRecorderMux) HandledPaths() []string {
|
||||
return append([]string{}, m.exposedPaths...)
|
||||
// ListedPaths returns the registered handler exposedPaths.
|
||||
func (m *PathRecorderMux) ListedPaths() []string {
|
||||
handledPaths := append([]string{}, m.exposedPaths...)
|
||||
sort.Strings(handledPaths)
|
||||
|
||||
return handledPaths
|
||||
}
|
||||
|
||||
// Handle registers the handler for the given pattern.
|
||||
|
|
|
@ -27,6 +27,6 @@ func TestSecretHandlers(t *testing.T) {
|
|||
c := NewPathRecorderMux()
|
||||
c.UnlistedHandleFunc("/secret", func(http.ResponseWriter, *http.Request) {})
|
||||
c.HandleFunc("/nonswagger", func(http.ResponseWriter, *http.Request) {})
|
||||
assert.NotContains(t, c.HandledPaths(), "/secret")
|
||||
assert.Contains(t, c.HandledPaths(), "/nonswagger")
|
||||
assert.NotContains(t, c.ListedPaths(), "/secret")
|
||||
assert.Contains(t, c.ListedPaths(), "/nonswagger")
|
||||
}
|
||||
|
|
|
@ -18,33 +18,52 @@ package routes
|
|||
|
||||
import (
|
||||
"net/http"
|
||||
"sort"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
||||
"k8s.io/apiserver/pkg/server/mux"
|
||||
)
|
||||
|
||||
// ListedPathProvider is an interface for providing paths that should be reported at /.
|
||||
type ListedPathProvider interface {
|
||||
// ListedPaths is an alphabetically sorted list of paths to be reported at /.
|
||||
ListedPaths() []string
|
||||
}
|
||||
|
||||
// ListedPathProviders is a convenient way to combine multiple ListedPathProviders
|
||||
type ListedPathProviders []ListedPathProvider
|
||||
|
||||
// ListedPaths unions and sorts the included paths.
|
||||
func (p ListedPathProviders) ListedPaths() []string {
|
||||
ret := sets.String{}
|
||||
for _, provider := range p {
|
||||
for _, path := range provider.ListedPaths() {
|
||||
ret.Insert(path)
|
||||
}
|
||||
}
|
||||
|
||||
return ret.List()
|
||||
}
|
||||
|
||||
// Index provides a webservice for the http root / listing all known paths.
|
||||
type Index struct{}
|
||||
|
||||
// Install adds the Index webservice to the given mux.
|
||||
func (i Index) Install(c *mux.APIContainer, mux *mux.PathRecorderMux) {
|
||||
func (i Index) Install(pathProvider ListedPathProvider, mux *mux.PathRecorderMux, delegate http.Handler) {
|
||||
mux.UnlistedHandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
||||
status := http.StatusOK
|
||||
if r.URL.Path != "/" && r.URL.Path != "/index.html" {
|
||||
// Since "/" matches all paths, handleIndex is called for all paths for which there is no handler api.Registry.
|
||||
// We want to return a 404 status with a list of all valid paths, incase of an invalid URL request.
|
||||
// if we have a delegate, we should call to it and simply return
|
||||
if delegate != nil {
|
||||
delegate.ServeHTTP(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
// If we have no delegate, we want to return a 404 status with a list of all valid paths, incase of an invalid URL request.
|
||||
status = http.StatusNotFound
|
||||
}
|
||||
var handledPaths []string
|
||||
// Extract the paths handled using restful.WebService
|
||||
for _, ws := range c.RegisteredWebServices() {
|
||||
handledPaths = append(handledPaths, ws.RootPath())
|
||||
}
|
||||
// Extract the paths handled using mux handler.
|
||||
handledPaths = append(handledPaths, mux.HandledPaths()...)
|
||||
sort.Strings(handledPaths)
|
||||
responsewriters.WriteRawJSON(status, metav1.RootPaths{Paths: handledPaths}, w)
|
||||
responsewriters.WriteRawJSON(status, metav1.RootPaths{Paths: pathProvider.ListedPaths()}, w)
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue