remove dynamic audit

Kubernetes-commit: ed4e6f10265ae32b1c2c0b254a4d2c20590cfadd
This commit is contained in:
David Eads 2020-05-27 14:04:09 -04:00 committed by Kubernetes Publisher
parent 64913bcbc2
commit a3201bc883
13 changed files with 8 additions and 1455 deletions

View File

@ -59,13 +59,6 @@ const (
// audited.
AdvancedAuditing featuregate.Feature = "AdvancedAuditing"
// owner: @pbarker
// alpha: v1.13
//
// DynamicAuditing enables configuration of audit policy and webhook backends through an
// AuditSink API object.
DynamicAuditing featuregate.Feature = "DynamicAuditing"
// owner: @ilackams
// alpha: v1.7
//
@ -163,7 +156,6 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
StreamingProxyRedirects: {Default: true, PreRelease: featuregate.Deprecated},
ValidateProxyRedirects: {Default: true, PreRelease: featuregate.Beta},
AdvancedAuditing: {Default: true, PreRelease: featuregate.GA},
DynamicAuditing: {Default: false, PreRelease: featuregate.Alpha},
APIResponseCompression: {Default: true, PreRelease: featuregate.Beta},
APIListChunking: {Default: true, PreRelease: featuregate.Beta},
DryRun: {Default: true, PreRelease: featuregate.GA},

View File

@ -27,7 +27,6 @@ import (
"gopkg.in/natefinch/lumberjack.v2"
"k8s.io/klog/v2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
utilnet "k8s.io/apimachinery/pkg/util/net"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
@ -36,20 +35,12 @@ import (
auditv1beta1 "k8s.io/apiserver/pkg/apis/audit/v1beta1"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/audit/policy"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/egressselector"
utilfeature "k8s.io/apiserver/pkg/util/feature"
pluginbuffered "k8s.io/apiserver/plugin/pkg/audit/buffered"
plugindynamic "k8s.io/apiserver/plugin/pkg/audit/dynamic"
pluginenforced "k8s.io/apiserver/plugin/pkg/audit/dynamic/enforced"
pluginlog "k8s.io/apiserver/plugin/pkg/audit/log"
plugintruncate "k8s.io/apiserver/plugin/pkg/audit/truncate"
pluginwebhook "k8s.io/apiserver/plugin/pkg/audit/webhook"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
)
const (
@ -80,7 +71,6 @@ type AuditOptions struct {
// Plugin options
LogOptions AuditLogOptions
WebhookOptions AuditWebhookOptions
DynamicOptions AuditDynamicOptions
}
const (
@ -180,10 +170,6 @@ func NewAuditOptions() *AuditOptions {
TruncateOptions: NewAuditTruncateOptions(),
GroupVersionString: "audit.k8s.io/v1",
},
DynamicOptions: AuditDynamicOptions{
Enabled: false,
BatchConfig: plugindynamic.NewDefaultWebhookBatchConfig(),
},
}
}
@ -206,7 +192,6 @@ func (o *AuditOptions) Validate() []error {
var allErrors []error
allErrors = append(allErrors, o.LogOptions.Validate()...)
allErrors = append(allErrors, o.WebhookOptions.Validate()...)
allErrors = append(allErrors, o.DynamicOptions.Validate()...)
return allErrors
}
@ -286,15 +271,10 @@ func (o *AuditOptions) AddFlags(fs *pflag.FlagSet) {
o.WebhookOptions.AddFlags(fs)
o.WebhookOptions.BatchOptions.AddFlags(pluginwebhook.PluginName, fs)
o.WebhookOptions.TruncateOptions.AddFlags(pluginwebhook.PluginName, fs)
o.DynamicOptions.AddFlags(fs)
}
func (o *AuditOptions) ApplyTo(
c *server.Config,
kubeClientConfig *restclient.Config,
informers informers.SharedInformerFactory,
processInfo *ProcessInfo,
webhookOptions *WebhookOptions,
) error {
if o == nil {
return nil
@ -347,23 +327,7 @@ func (o *AuditOptions) ApplyTo(
// 4. Apply dynamic options.
var dynamicBackend audit.Backend
if o.DynamicOptions.enabled() {
// if dynamic is enabled the webhook and log backends need to be wrapped in an enforced backend with the static policy
if webhookBackend != nil {
webhookBackend = pluginenforced.NewBackend(webhookBackend, checker)
}
if logBackend != nil {
logBackend = pluginenforced.NewBackend(logBackend, checker)
}
// build dynamic backend
dynamicBackend, checker, err = o.DynamicOptions.newBackend(c.ExternalAddress, kubeClientConfig, informers, processInfo, webhookOptions)
if err != nil {
return err
}
// union dynamic and webhook backends so that truncate options can be applied to both
dynamicBackend = appendBackend(webhookBackend, dynamicBackend)
dynamicBackend = o.WebhookOptions.TruncateOptions.wrapBackend(dynamicBackend, groupVersion)
} else if webhookBackend != nil {
if webhookBackend != nil {
// if only webhook is enabled wrap it in the truncate options
dynamicBackend = o.WebhookOptions.TruncateOptions.wrapBackend(webhookBackend, groupVersion)
}
@ -610,66 +574,6 @@ func (o *AuditWebhookOptions) newUntruncatedBackend(customDial utilnet.DialFunc)
return webhook, nil
}
func (o *AuditDynamicOptions) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&o.Enabled, "audit-dynamic-configuration", o.Enabled,
"Enables dynamic audit configuration. This feature also requires the DynamicAuditing feature flag")
}
func (o *AuditDynamicOptions) enabled() bool {
return o.Enabled && utilfeature.DefaultFeatureGate.Enabled(features.DynamicAuditing)
}
func (o *AuditDynamicOptions) Validate() []error {
var allErrors []error
if o.Enabled && !utilfeature.DefaultFeatureGate.Enabled(features.DynamicAuditing) {
allErrors = append(allErrors, fmt.Errorf("--audit-dynamic-configuration set, but DynamicAuditing feature gate is not enabled"))
}
return allErrors
}
func (o *AuditDynamicOptions) newBackend(
hostname string,
kubeClientConfig *restclient.Config,
informers informers.SharedInformerFactory,
processInfo *ProcessInfo,
webhookOptions *WebhookOptions,
) (audit.Backend, policy.Checker, error) {
if err := validateProcessInfo(processInfo); err != nil {
return nil, nil, err
}
clientset, err := kubernetes.NewForConfig(kubeClientConfig)
if err != nil {
return nil, nil, err
}
if webhookOptions == nil {
webhookOptions = NewWebhookOptions()
}
checker := policy.NewDynamicChecker()
informer := informers.Auditregistration().V1alpha1().AuditSinks()
eventSink := &v1core.EventSinkImpl{Interface: clientset.CoreV1().Events(processInfo.Namespace)}
dc := &plugindynamic.Config{
Informer: informer,
BufferedConfig: o.BatchConfig,
EventConfig: plugindynamic.EventConfig{
Sink: eventSink,
Source: corev1.EventSource{
Component: processInfo.Name,
Host: hostname,
},
},
WebhookConfig: plugindynamic.WebhookConfig{
AuthInfoResolverWrapper: webhookOptions.AuthInfoResolverWrapper,
ServiceResolver: webhookOptions.ServiceResolver,
},
}
backend, err := plugindynamic.NewBackend(dc)
if err != nil {
return nil, nil, fmt.Errorf("could not create dynamic audit backend: %v", err)
}
return backend, checker, nil
}
// defaultWebhookBatchConfig returns the default BatchConfig used by the Webhook backend.
func defaultWebhookBatchConfig() pluginbuffered.BatchConfig {
return pluginbuffered.BatchConfig{

View File

@ -23,20 +23,13 @@ import (
"os"
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
auditv1 "k8s.io/apiserver/pkg/apis/audit/v1"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/server"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd/api/v1"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"github.com/spf13/pflag"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
auditv1 "k8s.io/apiserver/pkg/apis/audit/v1"
"k8s.io/apiserver/pkg/server"
v1 "k8s.io/client-go/tools/clientcmd/api/v1"
)
func TestAuditValidOptions(t *testing.T) {
@ -46,12 +39,6 @@ func TestAuditValidOptions(t *testing.T) {
policy := makeTmpPolicy(t)
defer os.Remove(policy)
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DynamicAuditing, true)()
clientConfig := &restclient.Config{}
informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0)
processInfo := &ProcessInfo{"test", "test"}
testCases := []struct {
name string
options func() *AuditOptions
@ -135,56 +122,6 @@ func TestAuditValidOptions(t *testing.T) {
return o
},
expected: "truncate<buffered<webhook>>",
}, {
name: "dynamic",
options: func() *AuditOptions {
o := NewAuditOptions()
o.DynamicOptions.Enabled = true
return o
},
expected: "dynamic[]",
}, {
name: "dynamic with truncating",
options: func() *AuditOptions {
o := NewAuditOptions()
o.DynamicOptions.Enabled = true
o.WebhookOptions.TruncateOptions.Enabled = true
return o
},
expected: "truncate<dynamic[]>",
}, {
name: "dynamic with log",
options: func() *AuditOptions {
o := NewAuditOptions()
o.DynamicOptions.Enabled = true
o.LogOptions.Path = "/audit"
o.PolicyFile = policy
return o
},
expected: "union[enforced<ignoreErrors<log>>,dynamic[]]",
}, {
name: "dynamic with truncating and webhook",
options: func() *AuditOptions {
o := NewAuditOptions()
o.DynamicOptions.Enabled = true
o.WebhookOptions.TruncateOptions.Enabled = true
o.WebhookOptions.ConfigFile = webhookConfig
o.PolicyFile = policy
return o
},
expected: "truncate<union[enforced<buffered<webhook>>,dynamic[]]>",
}, {
name: "dynamic with truncating and webhook and log",
options: func() *AuditOptions {
o := NewAuditOptions()
o.DynamicOptions.Enabled = true
o.WebhookOptions.TruncateOptions.Enabled = true
o.WebhookOptions.ConfigFile = webhookConfig
o.PolicyFile = policy
o.LogOptions.Path = "/audit"
return o
},
expected: "union[enforced<ignoreErrors<log>>,truncate<union[enforced<buffered<webhook>>,dynamic[]]>]",
},
}
for _, tc := range testCases {
@ -200,7 +137,7 @@ func TestAuditValidOptions(t *testing.T) {
assert.Empty(t, options.Validate(), "Options should be valid.")
config := &server.Config{}
require.NoError(t, options.ApplyTo(config, clientConfig, informerFactory, processInfo, nil))
require.NoError(t, options.ApplyTo(config))
if tc.expected == "" {
assert.Nil(t, config.AuditBackend)
} else {
@ -275,13 +212,6 @@ func TestAuditInvalidOptions(t *testing.T) {
o.WebhookOptions.TruncateOptions.TruncateConfig.MaxBatchSize = 1
return o
},
}, {
name: "invalid dynamic flag group",
options: func() *AuditOptions {
o := NewAuditOptions()
o.DynamicOptions.Enabled = true
return o
},
},
}
for _, tc := range testCases {

View File

@ -1,56 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package options
import (
"fmt"
"os"
)
// ProcessInfo holds the apiserver process information used to send events
type ProcessInfo struct {
// Name of the api process to identify events
Name string
// Namespace of the api process to send events
Namespace string
}
// NewProcessInfo returns a new process info with the hostname concatenated to the name given
func NewProcessInfo(name, namespace string) *ProcessInfo {
// try to concat the hostname if available
host, _ := os.Hostname()
if host != "" {
name = fmt.Sprintf("%s-%s", name, host)
}
return &ProcessInfo{
Name: name,
Namespace: namespace,
}
}
// validateProcessInfo checks for a complete process info
func validateProcessInfo(p *ProcessInfo) error {
if p == nil {
return fmt.Errorf("ProcessInfo must be set")
} else if p.Name == "" {
return fmt.Errorf("ProcessInfo name must be set")
} else if p.Namespace == "" {
return fmt.Errorf("ProcessInfo namespace must be set")
}
return nil
}

View File

@ -48,14 +48,11 @@ type RecommendedOptions struct {
// admission plugin initializers to Admission.ApplyTo.
ExtraAdmissionInitializers func(c *server.RecommendedConfig) ([]admission.PluginInitializer, error)
Admission *AdmissionOptions
// ProcessInfo is used to identify events created by the server.
ProcessInfo *ProcessInfo
Webhook *WebhookOptions
// API Server Egress Selector is used to control outbound traffic from the API Server
EgressSelector *EgressSelectorOptions
}
func NewRecommendedOptions(prefix string, codec runtime.Codec, processInfo *ProcessInfo) *RecommendedOptions {
func NewRecommendedOptions(prefix string, codec runtime.Codec) *RecommendedOptions {
sso := NewSecureServingOptions()
// We are composing recommended options for an aggregated api-server,
@ -78,8 +75,6 @@ func NewRecommendedOptions(prefix string, codec runtime.Codec, processInfo *Proc
FeatureGate: feature.DefaultFeatureGate,
ExtraAdmissionInitializers: func(c *server.RecommendedConfig) ([]admission.PluginInitializer, error) { return nil, nil },
Admission: NewAdmissionOptions(),
ProcessInfo: processInfo,
Webhook: NewWebhookOptions(),
EgressSelector: NewEgressSelectorOptions(),
}
}
@ -111,7 +106,7 @@ func (o *RecommendedOptions) ApplyTo(config *server.RecommendedConfig) error {
if err := o.Authorization.ApplyTo(&config.Config.Authorization); err != nil {
return err
}
if err := o.Audit.ApplyTo(&config.Config, config.ClientConfig, config.SharedInformerFactory, o.ProcessInfo, o.Webhook); err != nil {
if err := o.Audit.ApplyTo(&config.Config); err != nil {
return err
}
if err := o.Features.ApplyTo(&config.Config); err != nil {

View File

@ -1,34 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package options
import (
utilwebhook "k8s.io/apiserver/pkg/util/webhook"
)
// WebhookOptions holds the outgoing webhook options
type WebhookOptions struct {
ServiceResolver utilwebhook.ServiceResolver
AuthInfoResolverWrapper utilwebhook.AuthenticationInfoResolverWrapper
}
// NewWebhookOptions returns the default options for outgoing webhooks
func NewWebhookOptions() *WebhookOptions {
return &WebhookOptions{
ServiceResolver: utilwebhook.NewDefaultServiceResolver(),
}
}

View File

@ -1,46 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamic
import (
"time"
bufferedplugin "k8s.io/apiserver/plugin/pkg/audit/buffered"
)
const (
// Default configuration values for ModeBatch when applied to a dynamic plugin
defaultBatchBufferSize = 5000 // Buffer up to 5000 events before starting discarding.
defaultBatchMaxSize = 400 // Only send up to 400 events at a time.
defaultBatchMaxWait = 30 * time.Second // Send events at least twice a minute.
defaultBatchThrottleQPS = 10 // Limit the send rate by 10 QPS.
defaultBatchThrottleBurst = 15 // Allow up to 15 QPS burst.
)
// NewDefaultWebhookBatchConfig returns new Batch Config objects populated by default values
// for dynamic webhooks
func NewDefaultWebhookBatchConfig() *bufferedplugin.BatchConfig {
return &bufferedplugin.BatchConfig{
BufferSize: defaultBatchBufferSize,
MaxBatchSize: defaultBatchMaxSize,
MaxBatchWait: defaultBatchMaxWait,
ThrottleEnable: true,
ThrottleQPS: defaultBatchThrottleQPS,
ThrottleBurst: defaultBatchThrottleBurst,
AsyncDelegate: true,
}
}

View File

@ -1,365 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamic
import (
"fmt"
"reflect"
"strings"
"sync"
"sync/atomic"
"k8s.io/klog/v2"
auditregv1alpha1 "k8s.io/api/auditregistration/v1alpha1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
auditinstall "k8s.io/apiserver/pkg/apis/audit/install"
auditv1 "k8s.io/apiserver/pkg/apis/audit/v1"
"k8s.io/apiserver/pkg/audit"
webhook "k8s.io/apiserver/pkg/util/webhook"
bufferedplugin "k8s.io/apiserver/plugin/pkg/audit/buffered"
auditinformer "k8s.io/client-go/informers/auditregistration/v1alpha1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
)
// PluginName is the name reported in error metrics.
const PluginName = "dynamic"
// Config holds the configuration for the dynamic backend
type Config struct {
// Informer for the audit sinks
Informer auditinformer.AuditSinkInformer
// EventConfig holds the configuration for event notifications about the AuditSink API objects
EventConfig EventConfig
// BufferedConfig is the runtime buffered configuration
BufferedConfig *bufferedplugin.BatchConfig
// WebhookConfig holds the configuration for outgoing webhooks
WebhookConfig WebhookConfig
}
// WebhookConfig holds the configurations for outgoing webhooks
type WebhookConfig struct {
// AuthInfoResolverWrapper provides the webhook authentication for in-cluster endpoints
AuthInfoResolverWrapper webhook.AuthenticationInfoResolverWrapper
// ServiceResolver knows how to convert a webhook service reference into an actual location.
ServiceResolver webhook.ServiceResolver
}
// EventConfig holds the configurations for sending event notifiations about AuditSink API objects
type EventConfig struct {
// Sink for emitting events
Sink record.EventSink
// Source holds the source information about the event emitter
Source corev1.EventSource
}
// delegate represents a delegate backend that was created from an audit sink configuration
type delegate struct {
audit.Backend
configuration *auditregv1alpha1.AuditSink
stopChan chan struct{}
}
// gracefulShutdown will gracefully shutdown the delegate
func (d *delegate) gracefulShutdown() {
close(d.stopChan)
d.Shutdown()
}
// NewBackend returns a backend that dynamically updates its configuration
// based on a shared informer.
func NewBackend(c *Config) (audit.Backend, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(c.EventConfig.Sink)
scheme := runtime.NewScheme()
err := auditregv1alpha1.AddToScheme(scheme)
if err != nil {
return nil, err
}
recorder := eventBroadcaster.NewRecorder(scheme, c.EventConfig.Source)
if c.BufferedConfig == nil {
c.BufferedConfig = NewDefaultWebhookBatchConfig()
}
cm, err := webhook.NewClientManager([]schema.GroupVersion{auditv1.SchemeGroupVersion}, func(s *runtime.Scheme) error {
auditinstall.Install(s)
return nil
})
if err != nil {
return nil, err
}
// TODO: need a way of injecting authentication before beta
authInfoResolver, err := webhook.NewDefaultAuthenticationInfoResolver("")
if err != nil {
return nil, err
}
cm.SetAuthenticationInfoResolver(authInfoResolver)
cm.SetServiceResolver(c.WebhookConfig.ServiceResolver)
cm.SetAuthenticationInfoResolverWrapper(c.WebhookConfig.AuthInfoResolverWrapper)
manager := &backend{
config: c,
delegates: atomic.Value{},
delegateUpdateMutex: sync.Mutex{},
stopped: false,
webhookClientManager: cm,
recorder: recorder,
}
manager.delegates.Store(syncedDelegates{})
c.Informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
manager.addSink(obj.(*auditregv1alpha1.AuditSink))
},
UpdateFunc: func(oldObj, newObj interface{}) {
manager.updateSink(oldObj.(*auditregv1alpha1.AuditSink), newObj.(*auditregv1alpha1.AuditSink))
},
DeleteFunc: func(obj interface{}) {
sink, ok := obj.(*auditregv1alpha1.AuditSink)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
klog.V(2).Infof("Couldn't get object from tombstone %#v", obj)
return
}
sink, ok = tombstone.Obj.(*auditregv1alpha1.AuditSink)
if !ok {
klog.V(2).Infof("Tombstone contained object that is not an AuditSink: %#v", obj)
return
}
}
manager.deleteSink(sink)
},
})
return manager, nil
}
type backend struct {
// delegateUpdateMutex holds an update lock on the delegates
delegateUpdateMutex sync.Mutex
stopped bool
config *Config
delegates atomic.Value
webhookClientManager webhook.ClientManager
recorder record.EventRecorder
}
type syncedDelegates map[types.UID]*delegate
// Names returns the names of the delegate configurations
func (s syncedDelegates) Names() []string {
names := []string{}
for _, delegate := range s {
names = append(names, delegate.configuration.Name)
}
return names
}
// ProcessEvents proccesses the given events per current delegate map
func (b *backend) ProcessEvents(events ...*auditinternal.Event) bool {
for _, d := range b.GetDelegates() {
d.ProcessEvents(events...)
}
// Returning true regardless of results, since dynamic audit backends
// can never cause apiserver request to fail.
return true
}
// Run starts a goroutine that propagates the shutdown signal,
// individual delegates are ran as they are created.
func (b *backend) Run(stopCh <-chan struct{}) error {
go func() {
<-stopCh
b.stopAllDelegates()
}()
return nil
}
// stopAllDelegates closes the stopChan for every delegate to enable
// goroutines to terminate gracefully. This is a helper method to propagate
// the primary stopChan to the current delegate map.
func (b *backend) stopAllDelegates() {
b.delegateUpdateMutex.Lock()
defer b.delegateUpdateMutex.Unlock()
if b.stopped {
return
}
b.stopped = true
for _, d := range b.GetDelegates() {
close(d.stopChan)
}
}
// Shutdown calls the shutdown method on all delegates. The stopChan should
// be closed before this is called.
func (b *backend) Shutdown() {
for _, d := range b.GetDelegates() {
d.Shutdown()
}
}
// GetDelegates retrieves current delegates in a safe manner
func (b *backend) GetDelegates() syncedDelegates {
return b.delegates.Load().(syncedDelegates)
}
// copyDelegates returns a copied delegate map
func (b *backend) copyDelegates() syncedDelegates {
c := make(syncedDelegates)
for u, s := range b.GetDelegates() {
c[u] = s
}
return c
}
// setDelegates sets the current delegates in a safe manner
func (b *backend) setDelegates(delegates syncedDelegates) {
b.delegates.Store(delegates)
}
// addSink is called by the shared informer when a sink is added
func (b *backend) addSink(sink *auditregv1alpha1.AuditSink) {
b.delegateUpdateMutex.Lock()
defer b.delegateUpdateMutex.Unlock()
if b.stopped {
msg := fmt.Sprintf("Could not add audit sink %q uid: %s. Update to all delegates is stopped.", sink.Name, sink.UID)
klog.Error(msg)
return
}
delegates := b.copyDelegates()
if _, ok := delegates[sink.UID]; ok {
klog.Errorf("Audit sink %q uid: %s already exists, could not readd", sink.Name, sink.UID)
return
}
d, err := b.createAndStartDelegate(sink)
if err != nil {
msg := fmt.Sprintf("Could not add audit sink %q: %v", sink.Name, err)
klog.Error(msg)
b.recorder.Event(sink, corev1.EventTypeWarning, "CreateFailed", msg)
return
}
delegates[sink.UID] = d
b.setDelegates(delegates)
klog.V(2).Infof("Added audit sink: %s", sink.Name)
klog.V(2).Infof("Current audit sinks: %v", delegates.Names())
}
// updateSink is called by the shared informer when a sink is updated.
// The new sink is only rebuilt on spec changes. The new sink must not have
// the same uid as the previous. The new sink will be started before the old
// one is shutdown so no events will be lost
func (b *backend) updateSink(oldSink, newSink *auditregv1alpha1.AuditSink) {
b.delegateUpdateMutex.Lock()
defer b.delegateUpdateMutex.Unlock()
if b.stopped {
msg := fmt.Sprintf("Could not update old audit sink %q to new audit sink %q. Update to all delegates is stopped.", oldSink.Name, newSink.Name)
klog.Error(msg)
return
}
delegates := b.copyDelegates()
oldDelegate, ok := delegates[oldSink.UID]
if !ok {
klog.Errorf("Could not update audit sink %q uid: %s, old sink does not exist",
oldSink.Name, oldSink.UID)
return
}
// check if spec has changed
eq := reflect.DeepEqual(oldSink.Spec, newSink.Spec)
if eq {
delete(delegates, oldSink.UID)
delegates[newSink.UID] = oldDelegate
b.setDelegates(delegates)
} else {
d, err := b.createAndStartDelegate(newSink)
if err != nil {
msg := fmt.Sprintf("Could not update audit sink %q: %v", oldSink.Name, err)
klog.Error(msg)
b.recorder.Event(newSink, corev1.EventTypeWarning, "UpdateFailed", msg)
return
}
delete(delegates, oldSink.UID)
delegates[newSink.UID] = d
b.setDelegates(delegates)
// graceful shutdown in goroutine as to not block
go oldDelegate.gracefulShutdown()
}
klog.V(2).Infof("Updated audit sink: %s", newSink.Name)
klog.V(2).Infof("Current audit sinks: %v", delegates.Names())
}
// deleteSink is called by the shared informer when a sink is deleted
func (b *backend) deleteSink(sink *auditregv1alpha1.AuditSink) {
b.delegateUpdateMutex.Lock()
defer b.delegateUpdateMutex.Unlock()
if b.stopped {
msg := fmt.Sprintf("Could not delete audit sink %q uid: %s. Update to all delegates is stopped.", sink.Name, sink.UID)
klog.Warning(msg)
return
}
delegates := b.copyDelegates()
delegate, ok := delegates[sink.UID]
if !ok {
klog.Errorf("Could not delete audit sink %q uid: %s, does not exist", sink.Name, sink.UID)
return
}
delete(delegates, sink.UID)
b.setDelegates(delegates)
// graceful shutdown in goroutine as to not block
go delegate.gracefulShutdown()
klog.V(2).Infof("Deleted audit sink: %s", sink.Name)
klog.V(2).Infof("Current audit sinks: %v", delegates.Names())
}
// createAndStartDelegate will build a delegate from an audit sink configuration and run it
func (b *backend) createAndStartDelegate(sink *auditregv1alpha1.AuditSink) (*delegate, error) {
f := factory{
config: b.config,
webhookClientManager: b.webhookClientManager,
sink: sink,
}
delegate, err := f.BuildDelegate()
if err != nil {
return nil, err
}
err = delegate.Run(delegate.stopChan)
if err != nil {
return nil, err
}
return delegate, nil
}
// String returns a string representation of the backend
func (b *backend) String() string {
var delegateStrings []string
for _, delegate := range b.GetDelegates() {
delegateStrings = append(delegateStrings, fmt.Sprintf("%s", delegate))
}
return fmt.Sprintf("%s[%s]", PluginName, strings.Join(delegateStrings, ","))
}

View File

@ -1,319 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamic
import (
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"reflect"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/require"
auditregv1alpha1 "k8s.io/api/auditregistration/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
auditv1 "k8s.io/apiserver/pkg/apis/audit/v1"
"k8s.io/apiserver/pkg/audit"
webhook "k8s.io/apiserver/pkg/util/webhook"
informers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
)
func TestDynamic(t *testing.T) {
eventList1 := &atomic.Value{}
eventList1.Store(auditinternal.EventList{})
eventList2 := &atomic.Value{}
eventList2.Store(auditinternal.EventList{})
// start test servers
server1 := httptest.NewServer(buildTestHandler(t, eventList1))
defer server1.Close()
server2 := httptest.NewServer(buildTestHandler(t, eventList2))
defer server2.Close()
testPolicy := auditregv1alpha1.Policy{
Level: auditregv1alpha1.LevelMetadata,
Stages: []auditregv1alpha1.Stage{
auditregv1alpha1.StageResponseStarted,
},
}
testEvent := auditinternal.Event{
Level: auditinternal.LevelMetadata,
Stage: auditinternal.StageResponseStarted,
Verb: "get",
RequestURI: "/test/path",
}
testConfig1 := &auditregv1alpha1.AuditSink{
ObjectMeta: metav1.ObjectMeta{
Name: "test1",
UID: types.UID("test1"),
},
Spec: auditregv1alpha1.AuditSinkSpec{
Policy: testPolicy,
Webhook: auditregv1alpha1.Webhook{
ClientConfig: auditregv1alpha1.WebhookClientConfig{
URL: &server1.URL,
},
},
},
}
testConfig2 := &auditregv1alpha1.AuditSink{
ObjectMeta: metav1.ObjectMeta{
Name: "test2",
UID: types.UID("test2"),
},
Spec: auditregv1alpha1.AuditSinkSpec{
Policy: testPolicy,
Webhook: auditregv1alpha1.Webhook{
ClientConfig: auditregv1alpha1.WebhookClientConfig{
URL: &server2.URL,
},
},
},
}
badURL := "http://badtest"
badConfig := &auditregv1alpha1.AuditSink{
ObjectMeta: metav1.ObjectMeta{
Name: "bad",
UID: types.UID("bad"),
},
Spec: auditregv1alpha1.AuditSinkSpec{
Policy: testPolicy,
Webhook: auditregv1alpha1.Webhook{
ClientConfig: auditregv1alpha1.WebhookClientConfig{
URL: &badURL,
},
},
},
}
config, stopChan := defaultTestConfig()
config.BufferedConfig.MaxBatchSize = 1
b, err := NewBackend(config)
require.NoError(t, err)
d := b.(*backend)
err = b.Run(stopChan)
require.NoError(t, err)
t.Run("find none", func(t *testing.T) {
require.Len(t, d.GetDelegates(), 0)
})
success := t.Run("find one", func(t *testing.T) {
d.addSink(testConfig1)
delegates := d.GetDelegates()
require.Len(t, delegates, 1)
require.Contains(t, delegates, types.UID("test1"))
require.Equal(t, testConfig1, delegates["test1"].configuration)
// send event and check that it arrives
b.ProcessEvents(&testEvent)
err := checkForEvent(eventList1, testEvent)
require.NoError(t, err, "unable to find events sent to sink")
})
require.True(t, success) // propagate failure
// test that a bad webhook configuration can be recovered from
success = t.Run("bad config", func(t *testing.T) {
d.addSink(badConfig)
delegates := d.GetDelegates()
require.Len(t, delegates, 2)
require.Contains(t, delegates, types.UID("bad"))
require.Equal(t, badConfig, delegates["bad"].configuration)
// send events to the buffer
b.ProcessEvents(&testEvent, &testEvent)
// event is in the buffer see if the sink can be deleted
// this will hang and fail if not handled properly
d.deleteSink(badConfig)
delegates = d.GetDelegates()
require.Len(t, delegates, 1)
require.Contains(t, delegates, types.UID("test1"))
require.Equal(t, testConfig1, delegates["test1"].configuration)
})
require.True(t, success) // propagate failure
success = t.Run("find two", func(t *testing.T) {
eventList1.Store(auditinternal.EventList{})
d.addSink(testConfig2)
delegates := d.GetDelegates()
require.Len(t, delegates, 2)
require.Contains(t, delegates, types.UID("test1"))
require.Contains(t, delegates, types.UID("test2"))
require.Equal(t, testConfig1, delegates["test1"].configuration)
require.Equal(t, testConfig2, delegates["test2"].configuration)
// send event to both delegates and check that it arrives in both places
b.ProcessEvents(&testEvent)
err := checkForEvent(eventList1, testEvent)
require.NoError(t, err, "unable to find events sent to sink 1")
err = checkForEvent(eventList2, testEvent)
require.NoError(t, err, "unable to find events sent to sink 2")
})
require.True(t, success) // propagate failure
success = t.Run("delete one", func(t *testing.T) {
eventList2.Store(auditinternal.EventList{})
d.deleteSink(testConfig1)
delegates := d.GetDelegates()
require.Len(t, delegates, 1)
require.Contains(t, delegates, types.UID("test2"))
require.Equal(t, testConfig2, delegates["test2"].configuration)
// send event and check that it arrives to remaining sink
b.ProcessEvents(&testEvent)
err := checkForEvent(eventList2, testEvent)
require.NoError(t, err, "unable to find events sent to sink")
})
require.True(t, success) // propagate failure
success = t.Run("update one", func(t *testing.T) {
eventList1.Store(auditinternal.EventList{})
oldConfig := *testConfig2
testConfig2.Spec.Webhook.ClientConfig.URL = &server1.URL
testConfig2.UID = types.UID("test2.1")
d.updateSink(&oldConfig, testConfig2)
delegates := d.GetDelegates()
require.Len(t, delegates, 1)
require.Contains(t, delegates, types.UID("test2.1"))
require.Equal(t, testConfig2, delegates["test2.1"].configuration)
// send event and check that it arrives to updated sink
b.ProcessEvents(&testEvent)
err := checkForEvent(eventList1, testEvent)
require.NoError(t, err, "unable to find events sent to sink")
})
require.True(t, success) // propagate failure
success = t.Run("update meta only", func(t *testing.T) {
eventList1.Store(auditinternal.EventList{})
oldConfig := *testConfig2
testConfig2.UID = types.UID("test2.2")
testConfig2.Labels = map[string]string{"my": "label"}
d.updateSink(&oldConfig, testConfig2)
delegates := d.GetDelegates()
require.Len(t, delegates, 1)
require.Contains(t, delegates, types.UID("test2.2"))
// send event and check that it arrives to same sink
b.ProcessEvents(&testEvent)
err := checkForEvent(eventList1, testEvent)
require.NoError(t, err, "unable to find events sent to sink")
})
require.True(t, success) // propagate failure
success = t.Run("shutdown", func(t *testing.T) {
// if the stop signal is not propagated correctly the buffers will not
// close down gracefully, and the shutdown method will hang causing
// the test will timeout.
timeoutChan := make(chan struct{})
successChan := make(chan struct{})
go func() {
time.Sleep(1 * time.Second)
timeoutChan <- struct{}{}
}()
go func() {
close(stopChan)
d.Shutdown()
successChan <- struct{}{}
}()
for {
select {
case <-timeoutChan:
t.Error("shutdown timed out")
return
case <-successChan:
return
}
}
})
require.True(t, success) // propagate failure
}
// checkForEvent will poll to check for an audit event in an atomic event list
func checkForEvent(a *atomic.Value, evSent auditinternal.Event) error {
return wait.Poll(100*time.Millisecond, 1*time.Second, func() (bool, error) {
el := a.Load().(auditinternal.EventList)
if len(el.Items) != 1 {
return false, nil
}
evFound := el.Items[0]
eq := reflect.DeepEqual(evSent, evFound)
if !eq {
return false, fmt.Errorf("event mismatch -- sent: %+v found: %+v", evSent, evFound)
}
return true, nil
})
}
// buildTestHandler returns a handler that will update the atomic value passed in
// with the event list it receives
func buildTestHandler(t *testing.T, a *atomic.Value) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
t.Fatalf("could not read request body: %v", err)
}
el := auditinternal.EventList{}
decoder := audit.Codecs.UniversalDecoder(auditv1.SchemeGroupVersion)
if err := runtime.DecodeInto(decoder, body, &el); err != nil {
t.Fatalf("failed decoding buf: %b, apiVersion: %s", body, auditv1.SchemeGroupVersion)
}
defer r.Body.Close()
a.Store(el)
w.WriteHeader(200)
})
}
// defaultTestConfig returns a Config object suitable for testing along with its
// associated stopChan
func defaultTestConfig() (*Config, chan struct{}) {
authWrapper := webhook.AuthenticationInfoResolverWrapper(
func(a webhook.AuthenticationInfoResolver) webhook.AuthenticationInfoResolver { return a },
)
client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
stop := make(chan struct{})
eventSink := &v1core.EventSinkImpl{Interface: client.CoreV1().Events("")}
informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)
informer := informerFactory.Auditregistration().V1alpha1().AuditSinks()
return &Config{
Informer: informer,
EventConfig: EventConfig{Sink: eventSink},
BufferedConfig: NewDefaultWebhookBatchConfig(),
WebhookConfig: WebhookConfig{
AuthInfoResolverWrapper: authWrapper,
ServiceResolver: webhook.NewDefaultServiceResolver(),
},
}, stop
}

View File

@ -1,93 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package enforced
import (
"fmt"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/audit"
ev "k8s.io/apiserver/pkg/audit/event"
"k8s.io/apiserver/pkg/audit/policy"
)
// PluginName is the name reported in error metrics.
const PluginName = "enforced"
// Backend filters audit events according to the policy
// trimming them as necessary to match the level
type Backend struct {
policyChecker policy.Checker
delegateBackend audit.Backend
}
// NewBackend returns an enforced audit backend that wraps delegate backend.
// Enforced backend automatically runs and shuts down the delegate backend.
func NewBackend(delegate audit.Backend, p policy.Checker) audit.Backend {
return &Backend{
policyChecker: p,
delegateBackend: delegate,
}
}
// Run the delegate backend
func (b Backend) Run(stopCh <-chan struct{}) error {
return b.delegateBackend.Run(stopCh)
}
// Shutdown the delegate backend
func (b Backend) Shutdown() {
b.delegateBackend.Shutdown()
}
// ProcessEvents enforces policy on a shallow copy of the given event
// dropping any sections that don't conform
func (b Backend) ProcessEvents(events ...*auditinternal.Event) bool {
for _, event := range events {
if event == nil {
continue
}
attr, err := ev.NewAttributes(event)
if err != nil {
audit.HandlePluginError(PluginName, err, event)
continue
}
level, stages := b.policyChecker.LevelAndStages(attr)
if level == auditinternal.LevelNone {
continue
}
// make shallow copy before modifying to satisfy interface definition
ev := *event
e, err := policy.EnforcePolicy(&ev, level, stages)
if err != nil {
audit.HandlePluginError(PluginName, err, event)
continue
}
if e == nil {
continue
}
b.delegateBackend.ProcessEvents(e)
}
// Returning true regardless of results, since dynamic audit backends
// can never cause apiserver request to fail.
return true
}
// String returns a string representation of the backend
func (b Backend) String() string {
return fmt.Sprintf("%s<%s>", PluginName, b.delegateBackend)
}

View File

@ -1,118 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package enforced
import (
"testing"
"github.com/stretchr/testify/require"
authnv1 "k8s.io/api/authentication/v1"
"k8s.io/apimachinery/pkg/runtime"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/audit/policy"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/authorization/authorizer"
fakeplugin "k8s.io/apiserver/plugin/pkg/audit/fake"
)
func TestEnforced(t *testing.T) {
testCases := []struct {
name string
event *auditinternal.Event
policy auditinternal.Policy
attribs authorizer.Attributes
expected []*auditinternal.Event
}{
{
name: "enforce level",
event: &auditinternal.Event{
Level: auditinternal.LevelRequestResponse,
Stage: auditinternal.StageResponseComplete,
RequestURI: "/apis/extensions/v1beta1",
RequestObject: &runtime.Unknown{Raw: []byte(`test`)},
ResponseObject: &runtime.Unknown{Raw: []byte(`test`)},
},
policy: auditinternal.Policy{
Rules: []auditinternal.PolicyRule{
{
Level: auditinternal.LevelMetadata,
},
},
},
expected: []*auditinternal.Event{
{
Level: auditinternal.LevelMetadata,
Stage: auditinternal.StageResponseComplete,
RequestURI: "/apis/extensions/v1beta1",
},
},
},
{
name: "enforce policy rule",
event: &auditinternal.Event{
Level: auditinternal.LevelRequestResponse,
Stage: auditinternal.StageResponseComplete,
RequestURI: "/apis/extensions/v1beta1",
User: authnv1.UserInfo{
Username: user.Anonymous,
},
RequestObject: &runtime.Unknown{Raw: []byte(`test`)},
ResponseObject: &runtime.Unknown{Raw: []byte(`test`)},
},
policy: auditinternal.Policy{
Rules: []auditinternal.PolicyRule{
{
Level: auditinternal.LevelNone,
Users: []string{user.Anonymous},
},
{
Level: auditinternal.LevelMetadata,
},
},
},
expected: []*auditinternal.Event{},
},
{
name: "nil event",
event: nil,
policy: auditinternal.Policy{
Rules: []auditinternal.PolicyRule{
{
Level: auditinternal.LevelMetadata,
},
},
},
expected: []*auditinternal.Event{},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ev := []*auditinternal.Event{}
fakeBackend := fakeplugin.Backend{
OnRequest: func(events []*auditinternal.Event) {
ev = events
},
}
b := NewBackend(&fakeBackend, policy.NewChecker(&tc.policy))
defer b.Shutdown()
b.ProcessEvents(tc.event)
require.Equal(t, tc.expected, ev)
})
}
}

View File

@ -1,91 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamic
import (
"fmt"
"time"
auditregv1alpha1 "k8s.io/api/auditregistration/v1alpha1"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/audit/policy"
auditutil "k8s.io/apiserver/pkg/audit/util"
"k8s.io/apiserver/pkg/util/webhook"
bufferedplugin "k8s.io/apiserver/plugin/pkg/audit/buffered"
enforcedplugin "k8s.io/apiserver/plugin/pkg/audit/dynamic/enforced"
webhookplugin "k8s.io/apiserver/plugin/pkg/audit/webhook"
)
// TODO: find a common place for all the default retry backoffs
const retryBackoff = 500 * time.Millisecond
// factory builds a delegate from an AuditSink
type factory struct {
config *Config
webhookClientManager webhook.ClientManager
sink *auditregv1alpha1.AuditSink
}
// BuildDelegate creates a delegate from the AuditSink object
func (f *factory) BuildDelegate() (*delegate, error) {
backend, err := f.buildWebhookBackend()
if err != nil {
return nil, err
}
backend = f.applyEnforcedOpts(backend)
backend = f.applyBufferedOpts(backend)
ch := make(chan struct{})
return &delegate{
Backend: backend,
configuration: f.sink,
stopChan: ch,
}, nil
}
func (f *factory) buildWebhookBackend() (audit.Backend, error) {
hookClient := auditutil.HookClientConfigForSink(f.sink)
client, err := f.webhookClientManager.HookClient(hookClient)
if err != nil {
return nil, fmt.Errorf("could not create webhook client: %v", err)
}
backend := webhookplugin.NewDynamicBackend(client, retryBackoff)
return backend, nil
}
func (f *factory) applyEnforcedOpts(delegate audit.Backend) audit.Backend {
pol := policy.ConvertDynamicPolicyToInternal(&f.sink.Spec.Policy)
checker := policy.NewChecker(pol)
eb := enforcedplugin.NewBackend(delegate, checker)
return eb
}
func (f *factory) applyBufferedOpts(delegate audit.Backend) audit.Backend {
bc := f.config.BufferedConfig
tc := f.sink.Spec.Webhook.Throttle
if tc != nil {
bc.ThrottleEnable = true
if tc.Burst != nil {
bc.ThrottleBurst = int(*tc.Burst)
}
if tc.QPS != nil {
bc.ThrottleQPS = float32(*tc.QPS)
}
} else {
bc.ThrottleEnable = false
}
return bufferedplugin.NewBackend(delegate, *bc)
}

View File

@ -1,146 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamic
import (
"testing"
"github.com/stretchr/testify/require"
auditregv1alpha1 "k8s.io/api/auditregistration/v1alpha1"
utilpointer "k8s.io/utils/pointer"
)
func TestToDelegate(t *testing.T) {
config, _ := defaultTestConfig()
defaultPolicy := auditregv1alpha1.Policy{
Level: auditregv1alpha1.LevelMetadata,
}
u := "http://localhost:4444"
for _, tc := range []struct {
name string
auditConfig *auditregv1alpha1.AuditSink
throttleConfig *auditregv1alpha1.WebhookThrottleConfig
expectedBackend string
}{
{
name: "build full",
auditConfig: &auditregv1alpha1.AuditSink{
Spec: auditregv1alpha1.AuditSinkSpec{
Policy: defaultPolicy,
Webhook: auditregv1alpha1.Webhook{
Throttle: &auditregv1alpha1.WebhookThrottleConfig{
QPS: utilpointer.Int64Ptr(10),
Burst: utilpointer.Int64Ptr(5),
},
ClientConfig: auditregv1alpha1.WebhookClientConfig{
URL: &u,
},
},
},
},
expectedBackend: "buffered<enforced<dynamic_webhook>>",
},
{
name: "build no throttle",
auditConfig: &auditregv1alpha1.AuditSink{
Spec: auditregv1alpha1.AuditSinkSpec{
Policy: defaultPolicy,
Webhook: auditregv1alpha1.Webhook{
ClientConfig: auditregv1alpha1.WebhookClientConfig{
URL: &u,
},
},
},
},
expectedBackend: "buffered<enforced<dynamic_webhook>>",
},
} {
t.Run(tc.name, func(t *testing.T) {
b, err := NewBackend(config)
require.NoError(t, err)
c := factory{
config: b.(*backend).config,
webhookClientManager: b.(*backend).webhookClientManager,
sink: tc.auditConfig,
}
d, err := c.BuildDelegate()
require.NoError(t, err)
require.Equal(t, tc.expectedBackend, d.String())
})
}
}
func TestBuildWebhookBackend(t *testing.T) {
defaultPolicy := auditregv1alpha1.Policy{
Level: auditregv1alpha1.LevelMetadata,
}
config, _ := defaultTestConfig()
b, err := NewBackend(config)
require.NoError(t, err)
d := b.(*backend)
u := "http://localhost:4444"
for _, tc := range []struct {
name string
auditConfig *auditregv1alpha1.AuditSink
shouldErr bool
expectedBackend string
}{
{
name: "build full",
auditConfig: &auditregv1alpha1.AuditSink{
Spec: auditregv1alpha1.AuditSinkSpec{
Policy: defaultPolicy,
Webhook: auditregv1alpha1.Webhook{
ClientConfig: auditregv1alpha1.WebhookClientConfig{
URL: &u,
},
},
},
},
expectedBackend: "dynamic_webhook",
shouldErr: false,
},
{
name: "fail missing url",
auditConfig: &auditregv1alpha1.AuditSink{
Spec: auditregv1alpha1.AuditSinkSpec{
Policy: defaultPolicy,
Webhook: auditregv1alpha1.Webhook{
ClientConfig: auditregv1alpha1.WebhookClientConfig{},
},
},
},
shouldErr: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
c := &factory{
config: config,
webhookClientManager: d.webhookClientManager,
sink: tc.auditConfig,
}
ab, err := c.buildWebhookBackend()
if tc.shouldErr {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Equal(t, tc.expectedBackend, ab.String())
})
}
}