Implemented truncating audit backend
Signed-off-by: Mik Vyatskov <vmik@google.com> Kubernetes-commit: 52fae991305e3252ccc5c9c86a9b7abc04c149af
This commit is contained in:
parent
a2264066a6
commit
53e0783ab7
|
@ -37,6 +37,7 @@ import (
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
pluginbuffered "k8s.io/apiserver/plugin/pkg/audit/buffered"
|
pluginbuffered "k8s.io/apiserver/plugin/pkg/audit/buffered"
|
||||||
pluginlog "k8s.io/apiserver/plugin/pkg/audit/log"
|
pluginlog "k8s.io/apiserver/plugin/pkg/audit/log"
|
||||||
|
plugintruncate "k8s.io/apiserver/plugin/pkg/audit/truncate"
|
||||||
pluginwebhook "k8s.io/apiserver/plugin/pkg/audit/webhook"
|
pluginwebhook "k8s.io/apiserver/plugin/pkg/audit/webhook"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -89,6 +90,14 @@ type AuditBatchOptions struct {
|
||||||
BatchConfig pluginbuffered.BatchConfig
|
BatchConfig pluginbuffered.BatchConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type AuditTruncateOptions struct {
|
||||||
|
// Whether truncating is enabled or not.
|
||||||
|
Enabled bool
|
||||||
|
|
||||||
|
// Truncating configuration.
|
||||||
|
TruncateConfig plugintruncate.Config
|
||||||
|
}
|
||||||
|
|
||||||
// AuditLogOptions determines the output of the structured audit log by default.
|
// AuditLogOptions determines the output of the structured audit log by default.
|
||||||
// If the AdvancedAuditing feature is set to false, AuditLogOptions holds the legacy
|
// If the AdvancedAuditing feature is set to false, AuditLogOptions holds the legacy
|
||||||
// audit log writer.
|
// audit log writer.
|
||||||
|
@ -100,6 +109,7 @@ type AuditLogOptions struct {
|
||||||
Format string
|
Format string
|
||||||
|
|
||||||
BatchOptions AuditBatchOptions
|
BatchOptions AuditBatchOptions
|
||||||
|
TruncateOptions AuditTruncateOptions
|
||||||
|
|
||||||
// API group version used for serializing audit events.
|
// API group version used for serializing audit events.
|
||||||
GroupVersionString string
|
GroupVersionString string
|
||||||
|
@ -111,6 +121,7 @@ type AuditWebhookOptions struct {
|
||||||
InitialBackoff time.Duration
|
InitialBackoff time.Duration
|
||||||
|
|
||||||
BatchOptions AuditBatchOptions
|
BatchOptions AuditBatchOptions
|
||||||
|
TruncateOptions AuditTruncateOptions
|
||||||
|
|
||||||
// API group version used for serializing audit events.
|
// API group version used for serializing audit events.
|
||||||
GroupVersionString string
|
GroupVersionString string
|
||||||
|
@ -122,11 +133,12 @@ func NewAuditOptions() *AuditOptions {
|
||||||
|
|
||||||
return &AuditOptions{
|
return &AuditOptions{
|
||||||
WebhookOptions: AuditWebhookOptions{
|
WebhookOptions: AuditWebhookOptions{
|
||||||
|
InitialBackoff: pluginwebhook.DefaultInitialBackoff,
|
||||||
BatchOptions: AuditBatchOptions{
|
BatchOptions: AuditBatchOptions{
|
||||||
Mode: ModeBatch,
|
Mode: ModeBatch,
|
||||||
BatchConfig: pluginbuffered.NewDefaultBatchConfig(),
|
BatchConfig: pluginbuffered.NewDefaultBatchConfig(),
|
||||||
},
|
},
|
||||||
InitialBackoff: pluginwebhook.DefaultInitialBackoff,
|
TruncateOptions: NewAuditTruncateOptions(),
|
||||||
GroupVersionString: "audit.k8s.io/v1beta1",
|
GroupVersionString: "audit.k8s.io/v1beta1",
|
||||||
},
|
},
|
||||||
LogOptions: AuditLogOptions{
|
LogOptions: AuditLogOptions{
|
||||||
|
@ -135,11 +147,22 @@ func NewAuditOptions() *AuditOptions {
|
||||||
Mode: ModeBlocking,
|
Mode: ModeBlocking,
|
||||||
BatchConfig: defaultLogBatchConfig,
|
BatchConfig: defaultLogBatchConfig,
|
||||||
},
|
},
|
||||||
|
TruncateOptions: NewAuditTruncateOptions(),
|
||||||
GroupVersionString: "audit.k8s.io/v1beta1",
|
GroupVersionString: "audit.k8s.io/v1beta1",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewAuditTruncateOptions() AuditTruncateOptions {
|
||||||
|
return AuditTruncateOptions{
|
||||||
|
Enabled: false,
|
||||||
|
TruncateConfig: plugintruncate.Config{
|
||||||
|
MaxBatchSize: 10 * 1024 * 1024, // 10MB
|
||||||
|
MaxEventSize: 100 * 1024, // 100KB
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Validate checks invalid config combination
|
// Validate checks invalid config combination
|
||||||
func (o *AuditOptions) Validate() []error {
|
func (o *AuditOptions) Validate() []error {
|
||||||
if o == nil {
|
if o == nil {
|
||||||
|
@ -232,8 +255,10 @@ func (o *AuditOptions) AddFlags(fs *pflag.FlagSet) {
|
||||||
|
|
||||||
o.LogOptions.AddFlags(fs)
|
o.LogOptions.AddFlags(fs)
|
||||||
o.LogOptions.BatchOptions.AddFlags(pluginlog.PluginName, fs)
|
o.LogOptions.BatchOptions.AddFlags(pluginlog.PluginName, fs)
|
||||||
|
o.LogOptions.TruncateOptions.AddFlags(pluginlog.PluginName, fs)
|
||||||
o.WebhookOptions.AddFlags(fs)
|
o.WebhookOptions.AddFlags(fs)
|
||||||
o.WebhookOptions.BatchOptions.AddFlags(pluginwebhook.PluginName, fs)
|
o.WebhookOptions.BatchOptions.AddFlags(pluginwebhook.PluginName, fs)
|
||||||
|
o.WebhookOptions.TruncateOptions.AddFlags(pluginwebhook.PluginName, fs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (o *AuditOptions) ApplyTo(c *server.Config) error {
|
func (o *AuditOptions) ApplyTo(c *server.Config) error {
|
||||||
|
@ -309,6 +334,38 @@ func (o *AuditBatchOptions) wrapBackend(delegate audit.Backend) audit.Backend {
|
||||||
return pluginbuffered.NewBackend(delegate, o.BatchConfig)
|
return pluginbuffered.NewBackend(delegate, o.BatchConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (o *AuditTruncateOptions) Validate(pluginName string) error {
|
||||||
|
config := o.TruncateConfig
|
||||||
|
if config.MaxEventSize <= 0 {
|
||||||
|
return fmt.Errorf("invalid audit truncate %s max event size %v, must be a positive number", pluginName, config.MaxEventSize)
|
||||||
|
}
|
||||||
|
if config.MaxBatchSize < config.MaxEventSize {
|
||||||
|
return fmt.Errorf("invalid audit truncate %s max batch size %v, must be greater than "+
|
||||||
|
"max event size (%v)", pluginName, config.MaxBatchSize, config.MaxEventSize)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *AuditTruncateOptions) AddFlags(pluginName string, fs *pflag.FlagSet) {
|
||||||
|
fs.BoolVar(&o.Enabled, fmt.Sprintf("audit-%s-truncate-enabled", pluginName),
|
||||||
|
o.Enabled, "Whether event and batch truncating is enabled.")
|
||||||
|
fs.Int64Var(&o.TruncateConfig.MaxBatchSize, fmt.Sprintf("audit-%s-truncate-max-batch-size", pluginName),
|
||||||
|
o.TruncateConfig.MaxBatchSize, "Maximum size of the batch sent to the underlying backend. "+
|
||||||
|
"Actual serialized size can be several hundreds of bytes greater. If a batch exceeds this limit, "+
|
||||||
|
"it is split into several batches of smaller size.")
|
||||||
|
fs.Int64Var(&o.TruncateConfig.MaxEventSize, fmt.Sprintf("audit-%s-truncate-max-event-size", pluginName),
|
||||||
|
o.TruncateConfig.MaxEventSize, "Maximum size of the audit event sent to the underlying backend. "+
|
||||||
|
"If the size of an event is greater than this number, first request and response are removed, and"+
|
||||||
|
"if this doesn't reduce the size enough, event is discarded.")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *AuditTruncateOptions) wrapBackend(delegate audit.Backend, gv schema.GroupVersion) audit.Backend {
|
||||||
|
if !o.Enabled {
|
||||||
|
return delegate
|
||||||
|
}
|
||||||
|
return plugintruncate.NewBackend(delegate, o.TruncateConfig, gv)
|
||||||
|
}
|
||||||
|
|
||||||
func (o *AuditLogOptions) AddFlags(fs *pflag.FlagSet) {
|
func (o *AuditLogOptions) AddFlags(fs *pflag.FlagSet) {
|
||||||
fs.StringVar(&o.Path, "audit-log-path", o.Path,
|
fs.StringVar(&o.Path, "audit-log-path", o.Path,
|
||||||
"If set, all requests coming to the apiserver will be logged to this file. '-' means standard out.")
|
"If set, all requests coming to the apiserver will be logged to this file. '-' means standard out.")
|
||||||
|
@ -337,6 +394,9 @@ func (o *AuditLogOptions) Validate() []error {
|
||||||
if err := validateBackendBatchOptions(pluginlog.PluginName, o.BatchOptions); err != nil {
|
if err := validateBackendBatchOptions(pluginlog.PluginName, o.BatchOptions); err != nil {
|
||||||
allErrors = append(allErrors, err)
|
allErrors = append(allErrors, err)
|
||||||
}
|
}
|
||||||
|
if err := o.TruncateOptions.Validate(pluginlog.PluginName); err != nil {
|
||||||
|
allErrors = append(allErrors, err)
|
||||||
|
}
|
||||||
|
|
||||||
if err := validateGroupVersionString(o.GroupVersionString); err != nil {
|
if err := validateGroupVersionString(o.GroupVersionString); err != nil {
|
||||||
allErrors = append(allErrors, err)
|
allErrors = append(allErrors, err)
|
||||||
|
@ -395,7 +455,9 @@ func (o *AuditLogOptions) advancedApplyTo(c *server.Config) error {
|
||||||
if w := o.getWriter(); w != nil {
|
if w := o.getWriter(); w != nil {
|
||||||
groupVersion, _ := schema.ParseGroupVersion(o.GroupVersionString)
|
groupVersion, _ := schema.ParseGroupVersion(o.GroupVersionString)
|
||||||
log := pluginlog.NewBackend(w, o.Format, groupVersion)
|
log := pluginlog.NewBackend(w, o.Format, groupVersion)
|
||||||
c.AuditBackend = appendBackend(c.AuditBackend, o.BatchOptions.wrapBackend(log))
|
log = o.BatchOptions.wrapBackend(log)
|
||||||
|
log = o.TruncateOptions.wrapBackend(log, groupVersion)
|
||||||
|
c.AuditBackend = appendBackend(c.AuditBackend, log)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -429,6 +491,9 @@ func (o *AuditWebhookOptions) Validate() []error {
|
||||||
if err := validateBackendBatchOptions(pluginwebhook.PluginName, o.BatchOptions); err != nil {
|
if err := validateBackendBatchOptions(pluginwebhook.PluginName, o.BatchOptions); err != nil {
|
||||||
allErrors = append(allErrors, err)
|
allErrors = append(allErrors, err)
|
||||||
}
|
}
|
||||||
|
if err := o.TruncateOptions.Validate(pluginwebhook.PluginName); err != nil {
|
||||||
|
allErrors = append(allErrors, err)
|
||||||
|
}
|
||||||
|
|
||||||
if err := validateGroupVersionString(o.GroupVersionString); err != nil {
|
if err := validateGroupVersionString(o.GroupVersionString); err != nil {
|
||||||
allErrors = append(allErrors, err)
|
allErrors = append(allErrors, err)
|
||||||
|
@ -451,6 +516,8 @@ func (o *AuditWebhookOptions) applyTo(c *server.Config) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("initializing audit webhook: %v", err)
|
return fmt.Errorf("initializing audit webhook: %v", err)
|
||||||
}
|
}
|
||||||
c.AuditBackend = appendBackend(c.AuditBackend, o.BatchOptions.wrapBackend(webhook))
|
webhook = o.BatchOptions.wrapBackend(webhook)
|
||||||
|
webhook = o.TruncateOptions.wrapBackend(webhook, groupVersion)
|
||||||
|
c.AuditBackend = appendBackend(c.AuditBackend, webhook)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -78,6 +78,15 @@ func TestAuditValidOptions(t *testing.T) {
|
||||||
return o
|
return o
|
||||||
},
|
},
|
||||||
expected: "union[buffered<log>,webhook]",
|
expected: "union[buffered<log>,webhook]",
|
||||||
|
}, {
|
||||||
|
name: "default webhook with truncating",
|
||||||
|
options: func() *AuditOptions {
|
||||||
|
o := NewAuditOptions()
|
||||||
|
o.WebhookOptions.ConfigFile = webhookConfig
|
||||||
|
o.WebhookOptions.TruncateOptions.Enabled = true
|
||||||
|
return o
|
||||||
|
},
|
||||||
|
expected: "truncate<buffered<webhook>>",
|
||||||
}}
|
}}
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
@ -148,6 +157,25 @@ func TestAuditInvalidOptions(t *testing.T) {
|
||||||
o.WebhookOptions.BatchOptions.BatchConfig.ThrottleQPS = -1
|
o.WebhookOptions.BatchOptions.BatchConfig.ThrottleQPS = -1
|
||||||
return o
|
return o
|
||||||
},
|
},
|
||||||
|
}, {
|
||||||
|
name: "invalid webhook truncate max event size",
|
||||||
|
options: func() *AuditOptions {
|
||||||
|
o := NewAuditOptions()
|
||||||
|
o.WebhookOptions.ConfigFile = "/audit"
|
||||||
|
o.WebhookOptions.TruncateOptions.Enabled = true
|
||||||
|
o.WebhookOptions.TruncateOptions.TruncateConfig.MaxEventSize = -1
|
||||||
|
return o
|
||||||
|
},
|
||||||
|
}, {
|
||||||
|
name: "invalid webhook truncate max batch size",
|
||||||
|
options: func() *AuditOptions {
|
||||||
|
o := NewAuditOptions()
|
||||||
|
o.WebhookOptions.ConfigFile = "/audit"
|
||||||
|
o.WebhookOptions.TruncateOptions.Enabled = true
|
||||||
|
o.WebhookOptions.TruncateOptions.TruncateConfig.MaxEventSize = 2
|
||||||
|
o.WebhookOptions.TruncateOptions.TruncateConfig.MaxBatchSize = 1
|
||||||
|
return o
|
||||||
|
},
|
||||||
}}
|
}}
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
|
|
@ -24,7 +24,7 @@ import (
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
auditinternal "k8s.io/apiserver/pkg/apis/audit"
|
auditinternal "k8s.io/apiserver/pkg/apis/audit"
|
||||||
"k8s.io/apiserver/pkg/audit"
|
"k8s.io/apiserver/plugin/pkg/audit/fake"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -83,7 +83,7 @@ func TestBufferedBackendCollectEvents(t *testing.T) {
|
||||||
t.Run(tc.desc, func(t *testing.T) {
|
t.Run(tc.desc, func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
backend := NewBackend(&fakeBackend{}, config).(*bufferedBackend)
|
backend := NewBackend(&fake.Backend{}, config).(*bufferedBackend)
|
||||||
|
|
||||||
backend.ProcessEvents(newEvents(tc.numEvents)...)
|
backend.ProcessEvents(newEvents(tc.numEvents)...)
|
||||||
batch := backend.collectEvents(tc.timer, tc.stopCh)
|
batch := backend.collectEvents(tc.timer, tc.stopCh)
|
||||||
|
@ -96,7 +96,7 @@ func TestBufferedBackendCollectEvents(t *testing.T) {
|
||||||
func TestBufferedBackendProcessEventsAfterStop(t *testing.T) {
|
func TestBufferedBackendProcessEventsAfterStop(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
backend := NewBackend(&fakeBackend{}, NewDefaultBatchConfig()).(*bufferedBackend)
|
backend := NewBackend(&fake.Backend{}, NewDefaultBatchConfig()).(*bufferedBackend)
|
||||||
|
|
||||||
backend.Run(closedStopCh)
|
backend.Run(closedStopCh)
|
||||||
backend.Shutdown()
|
backend.Shutdown()
|
||||||
|
@ -111,7 +111,7 @@ func TestBufferedBackendProcessEventsBufferFull(t *testing.T) {
|
||||||
|
|
||||||
config := NewDefaultBatchConfig()
|
config := NewDefaultBatchConfig()
|
||||||
config.BufferSize = 1
|
config.BufferSize = 1
|
||||||
backend := NewBackend(&fakeBackend{}, config).(*bufferedBackend)
|
backend := NewBackend(&fake.Backend{}, config).(*bufferedBackend)
|
||||||
|
|
||||||
backend.ProcessEvents(newEvents(2)...)
|
backend.ProcessEvents(newEvents(2)...)
|
||||||
|
|
||||||
|
@ -123,8 +123,8 @@ func TestBufferedBackendShutdownWaitsForDelegatedCalls(t *testing.T) {
|
||||||
|
|
||||||
delegatedCallStartCh := make(chan struct{})
|
delegatedCallStartCh := make(chan struct{})
|
||||||
delegatedCallEndCh := make(chan struct{})
|
delegatedCallEndCh := make(chan struct{})
|
||||||
delegateBackend := &fakeBackend{
|
delegateBackend := &fake.Backend{
|
||||||
onRequest: func(_ []*auditinternal.Event) {
|
OnRequest: func(_ []*auditinternal.Event) {
|
||||||
close(delegatedCallStartCh)
|
close(delegatedCallStartCh)
|
||||||
<-delegatedCallEndCh
|
<-delegatedCallEndCh
|
||||||
},
|
},
|
||||||
|
@ -159,23 +159,3 @@ func TestBufferedBackendShutdownWaitsForDelegatedCalls(t *testing.T) {
|
||||||
close(delegatedCallEndCh)
|
close(delegatedCallEndCh)
|
||||||
<-shutdownEndCh
|
<-shutdownEndCh
|
||||||
}
|
}
|
||||||
|
|
||||||
type fakeBackend struct {
|
|
||||||
onRequest func(events []*auditinternal.Event)
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ audit.Backend = &fakeBackend{}
|
|
||||||
|
|
||||||
func (b *fakeBackend) Run(stopCh <-chan struct{}) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *fakeBackend) Shutdown() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *fakeBackend) ProcessEvents(ev ...*auditinternal.Event) {
|
|
||||||
if b.onRequest != nil {
|
|
||||||
b.onRequest(ev)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -0,0 +1,18 @@
|
||||||
|
/*
|
||||||
|
Copyright 2018 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
// Package fake provides a fake audit.Backend interface implementation for testing.
|
||||||
|
package fake // import "k8s.io/apiserver/plugin/pkg/audit/fake"
|
|
@ -0,0 +1,46 @@
|
||||||
|
/*
|
||||||
|
Copyright 2018 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package fake
|
||||||
|
|
||||||
|
import (
|
||||||
|
auditinternal "k8s.io/apiserver/pkg/apis/audit"
|
||||||
|
"k8s.io/apiserver/pkg/audit"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ audit.Backend = &Backend{}
|
||||||
|
|
||||||
|
// Backend is a fake audit backend for testing purposes.
|
||||||
|
type Backend struct {
|
||||||
|
OnRequest func(events []*auditinternal.Event)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run does nothing.
|
||||||
|
func (b *Backend) Run(stopCh <-chan struct{}) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Shutdown does nothing.
|
||||||
|
func (b *Backend) Shutdown() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// ProcessEvents calls a callback on a batch, if present.
|
||||||
|
func (b *Backend) ProcessEvents(ev ...*auditinternal.Event) {
|
||||||
|
if b.OnRequest != nil {
|
||||||
|
b.OnRequest(ev)
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,19 @@
|
||||||
|
/*
|
||||||
|
Copyright 2018 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
// Package truncate provides an implementation for the audit.Backend interface
|
||||||
|
// that truncates audit events and sends them to the delegate audit.Backend.
|
||||||
|
package truncate // import "k8s.io/apiserver/plugin/pkg/audit/truncate"
|
|
@ -0,0 +1,158 @@
|
||||||
|
/*
|
||||||
|
Copyright 2018 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package truncate
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||||
|
auditinternal "k8s.io/apiserver/pkg/apis/audit"
|
||||||
|
"k8s.io/apiserver/pkg/audit"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// PluginName is the name reported in error metrics.
|
||||||
|
PluginName = "truncate"
|
||||||
|
|
||||||
|
// annotationKey defines the name of the annotation used to indicate truncation.
|
||||||
|
annotationKey = "audit.k8s.io/truncated"
|
||||||
|
// annotationValue defines the value of the annotation used to indicate truncation.
|
||||||
|
annotationValue = "true"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Config represents truncating backend configuration.
|
||||||
|
type Config struct {
|
||||||
|
// MaxEventSize defines max allowed size of the event. If the event is larger,
|
||||||
|
// truncating will be performed.
|
||||||
|
MaxEventSize int64
|
||||||
|
|
||||||
|
// MaxBatchSize defined max allowed size of the batch of events, passed to the backend.
|
||||||
|
// If the total size of the batch is larger than this number, batch will be split. Actual
|
||||||
|
// size of the serialized request might be slightly higher, on the order of hundreds of bytes.
|
||||||
|
MaxBatchSize int64
|
||||||
|
}
|
||||||
|
|
||||||
|
type backend struct {
|
||||||
|
// The delegate backend that actually exports events.
|
||||||
|
delegateBackend audit.Backend
|
||||||
|
|
||||||
|
// Configuration used for truncation.
|
||||||
|
c Config
|
||||||
|
|
||||||
|
// Encoder used to calculate audit event sizes.
|
||||||
|
e runtime.Encoder
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ audit.Backend = &backend{}
|
||||||
|
|
||||||
|
// NewBackend returns a new truncating backend, using configuration passed in the parameters.
|
||||||
|
func NewBackend(delegateBackend audit.Backend, config Config, groupVersion schema.GroupVersion) audit.Backend {
|
||||||
|
return &backend{
|
||||||
|
delegateBackend: delegateBackend,
|
||||||
|
c: config,
|
||||||
|
e: audit.Codecs.LegacyCodec(groupVersion),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *backend) ProcessEvents(events ...*auditinternal.Event) {
|
||||||
|
var errors []error
|
||||||
|
var impacted []*auditinternal.Event
|
||||||
|
var batch []*auditinternal.Event
|
||||||
|
var batchSize int64
|
||||||
|
for _, event := range events {
|
||||||
|
size, err := b.calcSize(event)
|
||||||
|
// If event was correctly serialized, but the size is more than allowed
|
||||||
|
// and it makes sense to do trimming, i.e. there's a request and/or
|
||||||
|
// response present, try to strip away request and response.
|
||||||
|
if err == nil && size > b.c.MaxEventSize && event.Level.GreaterOrEqual(auditinternal.LevelRequest) {
|
||||||
|
event = truncate(event)
|
||||||
|
size, err = b.calcSize(event)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
errors = append(errors, err)
|
||||||
|
impacted = append(impacted, event)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if size > b.c.MaxEventSize {
|
||||||
|
errors = append(errors, fmt.Errorf("event is too large even after truncating"))
|
||||||
|
impacted = append(impacted, event)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(batch) > 0 && batchSize+size > b.c.MaxBatchSize {
|
||||||
|
b.delegateBackend.ProcessEvents(batch...)
|
||||||
|
batch = []*auditinternal.Event{}
|
||||||
|
batchSize = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
batchSize += size
|
||||||
|
batch = append(batch, event)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(batch) > 0 {
|
||||||
|
b.delegateBackend.ProcessEvents(batch...)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(impacted) > 0 {
|
||||||
|
audit.HandlePluginError(PluginName, utilerrors.NewAggregate(errors), impacted...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// truncate removed request and response objects from the audit events,
|
||||||
|
// to try and keep at least metadata.
|
||||||
|
func truncate(e *auditinternal.Event) *auditinternal.Event {
|
||||||
|
// Make a shallow copy to avoid copying response/request objects.
|
||||||
|
newEvent := &auditinternal.Event{}
|
||||||
|
*newEvent = *e
|
||||||
|
|
||||||
|
newEvent.RequestObject = nil
|
||||||
|
newEvent.ResponseObject = nil
|
||||||
|
audit.LogAnnotation(newEvent, annotationKey, annotationValue)
|
||||||
|
return newEvent
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *backend) Run(stopCh <-chan struct{}) error {
|
||||||
|
// Nothing to do here
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *backend) Shutdown() {
|
||||||
|
// Nothing to do here
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *backend) calcSize(e *auditinternal.Event) (int64, error) {
|
||||||
|
s := &sizer{}
|
||||||
|
if err := b.e.Encode(e, s); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return s.Size, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *backend) String() string {
|
||||||
|
return fmt.Sprintf("%s<%s>", PluginName, b.delegateBackend)
|
||||||
|
}
|
||||||
|
|
||||||
|
type sizer struct {
|
||||||
|
Size int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *sizer) Write(p []byte) (n int, err error) {
|
||||||
|
s.Size += int64(len(p))
|
||||||
|
return len(p), nil
|
||||||
|
}
|
|
@ -0,0 +1,141 @@
|
||||||
|
/*
|
||||||
|
Copyright 2018 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package truncate
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
auditinternal "k8s.io/apiserver/pkg/apis/audit"
|
||||||
|
auditv1beta1 "k8s.io/apiserver/pkg/apis/audit/v1beta1"
|
||||||
|
"k8s.io/apiserver/plugin/pkg/audit/fake"
|
||||||
|
// Importing just for the schema definitions.
|
||||||
|
_ "k8s.io/apiserver/plugin/pkg/audit/webhook"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
defaultConfig = Config{
|
||||||
|
MaxBatchSize: 4 * 1024 * 1024,
|
||||||
|
MaxEventSize: 100 * 1024,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestTruncatingEvents(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
desc string
|
||||||
|
event *auditinternal.Event
|
||||||
|
wantDropped bool
|
||||||
|
wantTruncated bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
desc: "Empty event should not be truncated",
|
||||||
|
event: &auditinternal.Event{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "Event with too large body should be truncated",
|
||||||
|
event: &auditinternal.Event{
|
||||||
|
Level: auditinternal.LevelRequest,
|
||||||
|
RequestObject: &runtime.Unknown{
|
||||||
|
Raw: []byte("\"" + strings.Repeat("A", int(defaultConfig.MaxEventSize)) + "\""),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantTruncated: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "Event with too large metadata should be dropped",
|
||||||
|
event: &auditinternal.Event{
|
||||||
|
Annotations: map[string]string{
|
||||||
|
"key": strings.Repeat("A", int(defaultConfig.MaxEventSize)),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantDropped: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
tc := tc
|
||||||
|
t.Run(tc.desc, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
var event *auditinternal.Event
|
||||||
|
|
||||||
|
fb := &fake.Backend{
|
||||||
|
OnRequest: func(events []*auditinternal.Event) {
|
||||||
|
require.Equal(t, 1, len(events), "Expected single event in batch")
|
||||||
|
event = events[0]
|
||||||
|
},
|
||||||
|
}
|
||||||
|
b := NewBackend(fb, defaultConfig, auditv1beta1.SchemeGroupVersion)
|
||||||
|
b.ProcessEvents(tc.event)
|
||||||
|
|
||||||
|
require.Equal(t, !tc.wantDropped, event != nil, "Incorrect event presence")
|
||||||
|
if tc.wantTruncated {
|
||||||
|
require.Equal(t, annotationValue, event.Annotations[annotationKey], "Annotation should be present")
|
||||||
|
require.Nil(t, event.RequestObject, "After truncation request should be nil")
|
||||||
|
require.Nil(t, event.ResponseObject, "After truncation response should be nil")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSplittingBatches(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
desc string
|
||||||
|
config Config
|
||||||
|
events []*auditinternal.Event
|
||||||
|
wantBatchCount int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
desc: "Events fitting in one batch should not be split",
|
||||||
|
config: defaultConfig,
|
||||||
|
events: []*auditinternal.Event{{}},
|
||||||
|
wantBatchCount: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "Events not fitting in one batch should be split",
|
||||||
|
config: Config{
|
||||||
|
MaxEventSize: defaultConfig.MaxEventSize,
|
||||||
|
MaxBatchSize: 1,
|
||||||
|
},
|
||||||
|
events: []*auditinternal.Event{
|
||||||
|
{Annotations: map[string]string{"key": strings.Repeat("A", int(50))}},
|
||||||
|
{Annotations: map[string]string{"key": strings.Repeat("A", int(50))}},
|
||||||
|
},
|
||||||
|
wantBatchCount: 2,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tc := range testCases {
|
||||||
|
tc := tc
|
||||||
|
t.Run(tc.desc, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
var gotBatchCount int
|
||||||
|
fb := &fake.Backend{
|
||||||
|
OnRequest: func(events []*auditinternal.Event) {
|
||||||
|
gotBatchCount++
|
||||||
|
},
|
||||||
|
}
|
||||||
|
b := NewBackend(fb, tc.config, auditv1beta1.SchemeGroupVersion)
|
||||||
|
b.ProcessEvents(tc.events...)
|
||||||
|
|
||||||
|
require.Equal(t, tc.wantBatchCount, gotBatchCount)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue