move namespace lifecycle plugin to apiserver

Kubernetes-commit: 1a5da9afc804eed6630caa1a17540d1a171b211a
This commit is contained in:
p0lyn0mial 2017-05-25 20:32:43 +02:00 committed by Kubernetes Publisher
parent 6794013a5b
commit d3a026ac63
3 changed files with 567 additions and 0 deletions

View File

@ -0,0 +1,65 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
name = "go_default_library",
srcs = ["admission.go"],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/client/informers/informers_generated/internalversion:go_default_library",
"//pkg/client/listers/core/internalversion:go_default_library",
"//pkg/kubeapiserver/admission:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/cache:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apiserver/pkg/admission:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["admission_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/client/clientset_generated/internalclientset/fake:go_default_library",
"//pkg/client/informers/informers_generated/internalversion:go_default_library",
"//pkg/kubeapiserver/admission:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/admission:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -0,0 +1,225 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package lifecycle
import (
"fmt"
"io"
"time"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
utilcache "k8s.io/apimachinery/pkg/util/cache"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/admission"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/internalversion"
kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
)
const (
// Name of admission plug-in
PluginName = "NamespaceLifecycle"
// how long a namespace stays in the force live lookup cache before expiration.
forceLiveLookupTTL = 30 * time.Second
// how long to wait for a missing namespace before re-checking the cache (and then doing a live lookup)
// this accomplishes two things:
// 1. It allows a watch-fed cache time to observe a namespace creation event
// 2. It allows time for a namespace creation to distribute to members of a storage cluster,
// so the live lookup has a better chance of succeeding even if it isn't performed against the leader.
missingNamespaceWait = 50 * time.Millisecond
)
// Register registers a plugin
func Register(plugins *admission.Plugins) {
plugins.Register(PluginName, func(config io.Reader) (admission.Interface, error) {
return NewLifecycle(sets.NewString(metav1.NamespaceDefault, metav1.NamespaceSystem, metav1.NamespacePublic))
})
}
// lifecycle is an implementation of admission.Interface.
// It enforces life-cycle constraints around a Namespace depending on its Phase
type lifecycle struct {
*admission.Handler
client internalclientset.Interface
immortalNamespaces sets.String
namespaceLister corelisters.NamespaceLister
// forceLiveLookupCache holds a list of entries for namespaces that we have a strong reason to believe are stale in our local cache.
// if a namespace is in this cache, then we will ignore our local state and always fetch latest from api server.
forceLiveLookupCache *utilcache.LRUExpireCache
}
type forceLiveLookupEntry struct {
expiry time.Time
}
var _ = kubeapiserveradmission.WantsInternalKubeInformerFactory(&lifecycle{})
var _ = kubeapiserveradmission.WantsInternalKubeClientSet(&lifecycle{})
func makeNamespaceKey(namespace string) *api.Namespace {
return &api.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: namespace,
Namespace: "",
},
}
}
func (l *lifecycle) Admit(a admission.Attributes) error {
// prevent deletion of immortal namespaces
if a.GetOperation() == admission.Delete && a.GetKind().GroupKind() == api.Kind("Namespace") && l.immortalNamespaces.Has(a.GetName()) {
return errors.NewForbidden(a.GetResource().GroupResource(), a.GetName(), fmt.Errorf("this namespace may not be deleted"))
}
// if we're here, then we've already passed authentication, so we're allowed to do what we're trying to do
// if we're here, then the API server has found a route, which means that if we have a non-empty namespace
// its a namespaced resource.
if len(a.GetNamespace()) == 0 || a.GetKind().GroupKind() == api.Kind("Namespace") {
// if a namespace is deleted, we want to prevent all further creates into it
// while it is undergoing termination. to reduce incidences where the cache
// is slow to update, we add the namespace into a force live lookup list to ensure
// we are not looking at stale state.
if a.GetOperation() == admission.Delete {
l.forceLiveLookupCache.Add(a.GetName(), true, forceLiveLookupTTL)
}
return nil
}
// always allow access review checks. Returning status about the namespace would be leaking information
if isAccessReview(a) {
return nil
}
// we need to wait for our caches to warm
if !l.WaitForReady() {
return admission.NewForbidden(a, fmt.Errorf("not yet ready to handle request"))
}
var (
exists bool
err error
)
namespace, err := l.namespaceLister.Get(a.GetNamespace())
if err != nil {
if !errors.IsNotFound(err) {
return errors.NewInternalError(err)
}
} else {
exists = true
}
if !exists && a.GetOperation() == admission.Create {
// give the cache time to observe the namespace before rejecting a create.
// this helps when creating a namespace and immediately creating objects within it.
time.Sleep(missingNamespaceWait)
namespace, err = l.namespaceLister.Get(a.GetNamespace())
switch {
case errors.IsNotFound(err):
// no-op
case err != nil:
return errors.NewInternalError(err)
default:
exists = true
}
if exists {
glog.V(4).Infof("found %s in cache after waiting", a.GetNamespace())
}
}
// forceLiveLookup if true will skip looking at local cache state and instead always make a live call to server.
forceLiveLookup := false
if _, ok := l.forceLiveLookupCache.Get(a.GetNamespace()); ok {
// we think the namespace was marked for deletion, but our current local cache says otherwise, we will force a live lookup.
forceLiveLookup = exists && namespace.Status.Phase == api.NamespaceActive
}
// refuse to operate on non-existent namespaces
if !exists || forceLiveLookup {
// as a last resort, make a call directly to storage
namespace, err = l.client.Core().Namespaces().Get(a.GetNamespace(), metav1.GetOptions{})
switch {
case errors.IsNotFound(err):
return err
case err != nil:
return errors.NewInternalError(err)
}
glog.V(4).Infof("found %s via storage lookup", a.GetNamespace())
}
// ensure that we're not trying to create objects in terminating namespaces
if a.GetOperation() == admission.Create {
if namespace.Status.Phase != api.NamespaceTerminating {
return nil
}
// TODO: This should probably not be a 403
return admission.NewForbidden(a, fmt.Errorf("unable to create new content in namespace %s because it is being terminated.", a.GetNamespace()))
}
return nil
}
// NewLifecycle creates a new namespace lifecycle admission control handler
func NewLifecycle(immortalNamespaces sets.String) (admission.Interface, error) {
return newLifecycleWithClock(immortalNamespaces, clock.RealClock{})
}
func newLifecycleWithClock(immortalNamespaces sets.String, clock utilcache.Clock) (admission.Interface, error) {
forceLiveLookupCache := utilcache.NewLRUExpireCacheWithClock(100, clock)
return &lifecycle{
Handler: admission.NewHandler(admission.Create, admission.Update, admission.Delete),
immortalNamespaces: immortalNamespaces,
forceLiveLookupCache: forceLiveLookupCache,
}, nil
}
func (l *lifecycle) SetInternalKubeInformerFactory(f informers.SharedInformerFactory) {
namespaceInformer := f.Core().InternalVersion().Namespaces()
l.namespaceLister = namespaceInformer.Lister()
l.SetReadyFunc(namespaceInformer.Informer().HasSynced)
}
func (l *lifecycle) SetInternalKubeClientSet(client internalclientset.Interface) {
l.client = client
}
func (l *lifecycle) Validate() error {
if l.namespaceLister == nil {
return fmt.Errorf("missing namespaceLister")
}
if l.client == nil {
return fmt.Errorf("missing client")
}
return nil
}
// accessReviewResources are resources which give a view into permissions in a namespace. Users must be allowed to create these
// resources because returning "not found" errors allows someone to search for the "people I'm going to fire in 2017" namespace.
var accessReviewResources = map[schema.GroupResource]bool{
{Group: "authorization.k8s.io", Resource: "localsubjectaccessreviews"}: true,
}
func isAccessReview(a admission.Attributes) bool {
return accessReviewResources[a.GetResource().GroupResource()]
}

View File

@ -0,0 +1,277 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package lifecycle
import (
"fmt"
"testing"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/admission"
core "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/api"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
kubeadmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
)
// newHandlerForTest returns a configured handler for testing.
func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.SharedInformerFactory, error) {
return newHandlerForTestWithClock(c, clock.RealClock{})
}
// newHandlerForTestWithClock returns a configured handler for testing.
func newHandlerForTestWithClock(c clientset.Interface, cacheClock clock.Clock) (admission.Interface, informers.SharedInformerFactory, error) {
f := informers.NewSharedInformerFactory(c, 5*time.Minute)
handler, err := newLifecycleWithClock(sets.NewString(metav1.NamespaceDefault, metav1.NamespaceSystem), cacheClock)
if err != nil {
return nil, f, err
}
pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil, nil, nil, nil)
pluginInitializer.Initialize(handler)
err = admission.Validate(handler)
return handler, f, err
}
// newMockClientForTest creates a mock client that returns a client configured for the specified list of namespaces with the specified phase.
func newMockClientForTest(namespaces map[string]api.NamespacePhase) *fake.Clientset {
mockClient := &fake.Clientset{}
mockClient.AddReactor("list", "namespaces", func(action core.Action) (bool, runtime.Object, error) {
namespaceList := &api.NamespaceList{
ListMeta: metav1.ListMeta{
ResourceVersion: fmt.Sprintf("%d", len(namespaces)),
},
}
index := 0
for name, phase := range namespaces {
namespaceList.Items = append(namespaceList.Items, api.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: name,
ResourceVersion: fmt.Sprintf("%d", index),
},
Status: api.NamespaceStatus{
Phase: phase,
},
})
index++
}
return true, namespaceList, nil
})
return mockClient
}
// newPod returns a new pod for the specified namespace
func newPod(namespace string) api.Pod {
return api.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "123", Namespace: namespace},
Spec: api.PodSpec{
Volumes: []api.Volume{{Name: "vol"}},
Containers: []api.Container{{Name: "ctr", Image: "image"}},
},
}
}
func TestAccessReviewCheckOnMissingNamespace(t *testing.T) {
namespace := "test"
mockClient := newMockClientForTest(map[string]api.NamespacePhase{})
mockClient.AddReactor("get", "namespaces", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, fmt.Errorf("nope, out of luck")
})
handler, informerFactory, err := newHandlerForTest(mockClient)
if err != nil {
t.Errorf("unexpected error initializing handler: %v", err)
}
informerFactory.Start(wait.NeverStop)
err = handler.Admit(admission.NewAttributesRecord(nil, nil, schema.GroupVersionKind{Group: "authorization.k8s.io", Version: "v1", Kind: "LocalSubjectAccesReview"}, namespace, "", schema.GroupVersionResource{Group: "authorization.k8s.io", Version: "v1", Resource: "localsubjectaccessreviews"}, "", admission.Create, nil))
if err != nil {
t.Error(err)
}
}
// TestAdmissionNamespaceDoesNotExist verifies pod is not admitted if namespace does not exist.
func TestAdmissionNamespaceDoesNotExist(t *testing.T) {
namespace := "test"
mockClient := newMockClientForTest(map[string]api.NamespacePhase{})
mockClient.AddReactor("get", "namespaces", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, fmt.Errorf("nope, out of luck")
})
handler, informerFactory, err := newHandlerForTest(mockClient)
if err != nil {
t.Errorf("unexpected error initializing handler: %v", err)
}
informerFactory.Start(wait.NeverStop)
pod := newPod(namespace)
err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil))
if err == nil {
actions := ""
for _, action := range mockClient.Actions() {
actions = actions + action.GetVerb() + ":" + action.GetResource().Resource + ":" + action.GetSubresource() + ", "
}
t.Errorf("expected error returned from admission handler: %v", actions)
}
}
// TestAdmissionNamespaceActive verifies a resource is admitted when the namespace is active.
func TestAdmissionNamespaceActive(t *testing.T) {
namespace := "test"
mockClient := newMockClientForTest(map[string]api.NamespacePhase{
namespace: api.NamespaceActive,
})
handler, informerFactory, err := newHandlerForTest(mockClient)
if err != nil {
t.Errorf("unexpected error initializing handler: %v", err)
}
informerFactory.Start(wait.NeverStop)
pod := newPod(namespace)
err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil))
if err != nil {
t.Errorf("unexpected error returned from admission handler")
}
}
// TestAdmissionNamespaceTerminating verifies a resource is not created when the namespace is active.
func TestAdmissionNamespaceTerminating(t *testing.T) {
namespace := "test"
mockClient := newMockClientForTest(map[string]api.NamespacePhase{
namespace: api.NamespaceTerminating,
})
handler, informerFactory, err := newHandlerForTest(mockClient)
if err != nil {
t.Errorf("unexpected error initializing handler: %v", err)
}
informerFactory.Start(wait.NeverStop)
pod := newPod(namespace)
// verify create operations in the namespace cause an error
err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil))
if err == nil {
t.Errorf("Expected error rejecting creates in a namespace when it is terminating")
}
// verify update operations in the namespace can proceed
err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Update, nil))
if err != nil {
t.Errorf("Unexpected error returned from admission handler: %v", err)
}
// verify delete operations in the namespace can proceed
err = handler.Admit(admission.NewAttributesRecord(nil, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Delete, nil))
if err != nil {
t.Errorf("Unexpected error returned from admission handler: %v", err)
}
// verify delete of namespace default can never proceed
err = handler.Admit(admission.NewAttributesRecord(nil, nil, api.Kind("Namespace").WithVersion("version"), "", metav1.NamespaceDefault, api.Resource("namespaces").WithVersion("version"), "", admission.Delete, nil))
if err == nil {
t.Errorf("Expected an error that this namespace can never be deleted")
}
// verify delete of namespace other than default can proceed
err = handler.Admit(admission.NewAttributesRecord(nil, nil, api.Kind("Namespace").WithVersion("version"), "", "other", api.Resource("namespaces").WithVersion("version"), "", admission.Delete, nil))
if err != nil {
t.Errorf("Did not expect an error %v", err)
}
}
// TestAdmissionNamespaceForceLiveLookup verifies live lookups are done after deleting a namespace
func TestAdmissionNamespaceForceLiveLookup(t *testing.T) {
namespace := "test"
getCalls := int64(0)
phases := map[string]api.NamespacePhase{namespace: api.NamespaceActive}
mockClient := newMockClientForTest(phases)
mockClient.AddReactor("get", "namespaces", func(action core.Action) (bool, runtime.Object, error) {
getCalls++
return true, &api.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}, Status: api.NamespaceStatus{Phase: phases[namespace]}}, nil
})
fakeClock := clock.NewFakeClock(time.Now())
handler, informerFactory, err := newHandlerForTestWithClock(mockClient, fakeClock)
if err != nil {
t.Errorf("unexpected error initializing handler: %v", err)
}
informerFactory.Start(wait.NeverStop)
pod := newPod(namespace)
// verify create operations in the namespace is allowed
err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil))
if err != nil {
t.Errorf("Unexpected error rejecting creates in an active namespace")
}
if getCalls != 0 {
t.Errorf("Expected no live lookups of the namespace, got %d", getCalls)
}
getCalls = 0
// verify delete of namespace can proceed
err = handler.Admit(admission.NewAttributesRecord(nil, nil, api.Kind("Namespace").WithVersion("version"), "", namespace, api.Resource("namespaces").WithVersion("version"), "", admission.Delete, nil))
if err != nil {
t.Errorf("Expected namespace deletion to be allowed")
}
if getCalls != 0 {
t.Errorf("Expected no live lookups of the namespace, got %d", getCalls)
}
getCalls = 0
// simulate the phase changing
phases[namespace] = api.NamespaceTerminating
// verify create operations in the namespace cause an error
err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil))
if err == nil {
t.Errorf("Expected error rejecting creates in a namespace right after deleting it")
}
if getCalls != 1 {
t.Errorf("Expected a live lookup of the namespace at t=0, got %d", getCalls)
}
getCalls = 0
// Ensure the live lookup is still forced up to forceLiveLookupTTL
fakeClock.Step(forceLiveLookupTTL)
// verify create operations in the namespace cause an error
err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil))
if err == nil {
t.Errorf("Expected error rejecting creates in a namespace right after deleting it")
}
if getCalls != 1 {
t.Errorf("Expected a live lookup of the namespace at t=forceLiveLookupTTL, got %d", getCalls)
}
getCalls = 0
// Ensure the live lookup expires
fakeClock.Step(time.Millisecond)
// verify create operations in the namespace don't force a live lookup after the timeout
handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil))
if getCalls != 0 {
t.Errorf("Expected no live lookup of the namespace at t=forceLiveLookupTTL+1ms, got %d", getCalls)
}
getCalls = 0
}