From 53e0783ab71b96acd6f972110c12d7882ebb12f7 Mon Sep 17 00:00:00 2001 From: Mik Vyatskov Date: Fri, 23 Mar 2018 16:13:34 +0100 Subject: [PATCH] Implemented truncating audit backend Signed-off-by: Mik Vyatskov Kubernetes-commit: 52fae991305e3252ccc5c9c86a9b7abc04c149af --- pkg/server/options/audit.go | 77 +++++++++- pkg/server/options/audit_test.go | 28 ++++ plugin/pkg/audit/buffered/buffered_test.go | 32 +---- plugin/pkg/audit/fake/doc.go | 18 +++ plugin/pkg/audit/fake/fake.go | 46 ++++++ plugin/pkg/audit/truncate/doc.go | 19 +++ plugin/pkg/audit/truncate/truncate.go | 158 +++++++++++++++++++++ plugin/pkg/audit/truncate/truncate_test.go | 141 ++++++++++++++++++ 8 files changed, 488 insertions(+), 31 deletions(-) create mode 100644 plugin/pkg/audit/fake/doc.go create mode 100644 plugin/pkg/audit/fake/fake.go create mode 100644 plugin/pkg/audit/truncate/doc.go create mode 100644 plugin/pkg/audit/truncate/truncate.go create mode 100644 plugin/pkg/audit/truncate/truncate_test.go diff --git a/pkg/server/options/audit.go b/pkg/server/options/audit.go index 164b7ccce..ab7781216 100644 --- a/pkg/server/options/audit.go +++ b/pkg/server/options/audit.go @@ -37,6 +37,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" pluginbuffered "k8s.io/apiserver/plugin/pkg/audit/buffered" pluginlog "k8s.io/apiserver/plugin/pkg/audit/log" + plugintruncate "k8s.io/apiserver/plugin/pkg/audit/truncate" pluginwebhook "k8s.io/apiserver/plugin/pkg/audit/webhook" ) @@ -89,6 +90,14 @@ type AuditBatchOptions struct { BatchConfig pluginbuffered.BatchConfig } +type AuditTruncateOptions struct { + // Whether truncating is enabled or not. + Enabled bool + + // Truncating configuration. + TruncateConfig plugintruncate.Config +} + // AuditLogOptions determines the output of the structured audit log by default. // If the AdvancedAuditing feature is set to false, AuditLogOptions holds the legacy // audit log writer. @@ -99,7 +108,8 @@ type AuditLogOptions struct { MaxSize int Format string - BatchOptions AuditBatchOptions + BatchOptions AuditBatchOptions + TruncateOptions AuditTruncateOptions // API group version used for serializing audit events. GroupVersionString string @@ -110,7 +120,8 @@ type AuditWebhookOptions struct { ConfigFile string InitialBackoff time.Duration - BatchOptions AuditBatchOptions + BatchOptions AuditBatchOptions + TruncateOptions AuditTruncateOptions // API group version used for serializing audit events. GroupVersionString string @@ -122,11 +133,12 @@ func NewAuditOptions() *AuditOptions { return &AuditOptions{ WebhookOptions: AuditWebhookOptions{ + InitialBackoff: pluginwebhook.DefaultInitialBackoff, BatchOptions: AuditBatchOptions{ Mode: ModeBatch, BatchConfig: pluginbuffered.NewDefaultBatchConfig(), }, - InitialBackoff: pluginwebhook.DefaultInitialBackoff, + TruncateOptions: NewAuditTruncateOptions(), GroupVersionString: "audit.k8s.io/v1beta1", }, LogOptions: AuditLogOptions{ @@ -135,11 +147,22 @@ func NewAuditOptions() *AuditOptions { Mode: ModeBlocking, BatchConfig: defaultLogBatchConfig, }, + TruncateOptions: NewAuditTruncateOptions(), GroupVersionString: "audit.k8s.io/v1beta1", }, } } +func NewAuditTruncateOptions() AuditTruncateOptions { + return AuditTruncateOptions{ + Enabled: false, + TruncateConfig: plugintruncate.Config{ + MaxBatchSize: 10 * 1024 * 1024, // 10MB + MaxEventSize: 100 * 1024, // 100KB + }, + } +} + // Validate checks invalid config combination func (o *AuditOptions) Validate() []error { if o == nil { @@ -232,8 +255,10 @@ func (o *AuditOptions) AddFlags(fs *pflag.FlagSet) { o.LogOptions.AddFlags(fs) o.LogOptions.BatchOptions.AddFlags(pluginlog.PluginName, fs) + o.LogOptions.TruncateOptions.AddFlags(pluginlog.PluginName, fs) o.WebhookOptions.AddFlags(fs) o.WebhookOptions.BatchOptions.AddFlags(pluginwebhook.PluginName, fs) + o.WebhookOptions.TruncateOptions.AddFlags(pluginwebhook.PluginName, fs) } func (o *AuditOptions) ApplyTo(c *server.Config) error { @@ -309,6 +334,38 @@ func (o *AuditBatchOptions) wrapBackend(delegate audit.Backend) audit.Backend { return pluginbuffered.NewBackend(delegate, o.BatchConfig) } +func (o *AuditTruncateOptions) Validate(pluginName string) error { + config := o.TruncateConfig + if config.MaxEventSize <= 0 { + return fmt.Errorf("invalid audit truncate %s max event size %v, must be a positive number", pluginName, config.MaxEventSize) + } + if config.MaxBatchSize < config.MaxEventSize { + return fmt.Errorf("invalid audit truncate %s max batch size %v, must be greater than "+ + "max event size (%v)", pluginName, config.MaxBatchSize, config.MaxEventSize) + } + return nil +} + +func (o *AuditTruncateOptions) AddFlags(pluginName string, fs *pflag.FlagSet) { + fs.BoolVar(&o.Enabled, fmt.Sprintf("audit-%s-truncate-enabled", pluginName), + o.Enabled, "Whether event and batch truncating is enabled.") + fs.Int64Var(&o.TruncateConfig.MaxBatchSize, fmt.Sprintf("audit-%s-truncate-max-batch-size", pluginName), + o.TruncateConfig.MaxBatchSize, "Maximum size of the batch sent to the underlying backend. "+ + "Actual serialized size can be several hundreds of bytes greater. If a batch exceeds this limit, "+ + "it is split into several batches of smaller size.") + fs.Int64Var(&o.TruncateConfig.MaxEventSize, fmt.Sprintf("audit-%s-truncate-max-event-size", pluginName), + o.TruncateConfig.MaxEventSize, "Maximum size of the audit event sent to the underlying backend. "+ + "If the size of an event is greater than this number, first request and response are removed, and"+ + "if this doesn't reduce the size enough, event is discarded.") +} + +func (o *AuditTruncateOptions) wrapBackend(delegate audit.Backend, gv schema.GroupVersion) audit.Backend { + if !o.Enabled { + return delegate + } + return plugintruncate.NewBackend(delegate, o.TruncateConfig, gv) +} + func (o *AuditLogOptions) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&o.Path, "audit-log-path", o.Path, "If set, all requests coming to the apiserver will be logged to this file. '-' means standard out.") @@ -337,6 +394,9 @@ func (o *AuditLogOptions) Validate() []error { if err := validateBackendBatchOptions(pluginlog.PluginName, o.BatchOptions); err != nil { allErrors = append(allErrors, err) } + if err := o.TruncateOptions.Validate(pluginlog.PluginName); err != nil { + allErrors = append(allErrors, err) + } if err := validateGroupVersionString(o.GroupVersionString); err != nil { allErrors = append(allErrors, err) @@ -395,7 +455,9 @@ func (o *AuditLogOptions) advancedApplyTo(c *server.Config) error { if w := o.getWriter(); w != nil { groupVersion, _ := schema.ParseGroupVersion(o.GroupVersionString) log := pluginlog.NewBackend(w, o.Format, groupVersion) - c.AuditBackend = appendBackend(c.AuditBackend, o.BatchOptions.wrapBackend(log)) + log = o.BatchOptions.wrapBackend(log) + log = o.TruncateOptions.wrapBackend(log, groupVersion) + c.AuditBackend = appendBackend(c.AuditBackend, log) } return nil } @@ -429,6 +491,9 @@ func (o *AuditWebhookOptions) Validate() []error { if err := validateBackendBatchOptions(pluginwebhook.PluginName, o.BatchOptions); err != nil { allErrors = append(allErrors, err) } + if err := o.TruncateOptions.Validate(pluginwebhook.PluginName); err != nil { + allErrors = append(allErrors, err) + } if err := validateGroupVersionString(o.GroupVersionString); err != nil { allErrors = append(allErrors, err) @@ -451,6 +516,8 @@ func (o *AuditWebhookOptions) applyTo(c *server.Config) error { if err != nil { return fmt.Errorf("initializing audit webhook: %v", err) } - c.AuditBackend = appendBackend(c.AuditBackend, o.BatchOptions.wrapBackend(webhook)) + webhook = o.BatchOptions.wrapBackend(webhook) + webhook = o.TruncateOptions.wrapBackend(webhook, groupVersion) + c.AuditBackend = appendBackend(c.AuditBackend, webhook) return nil } diff --git a/pkg/server/options/audit_test.go b/pkg/server/options/audit_test.go index e125f6b08..78ed4f6a2 100644 --- a/pkg/server/options/audit_test.go +++ b/pkg/server/options/audit_test.go @@ -78,6 +78,15 @@ func TestAuditValidOptions(t *testing.T) { return o }, expected: "union[buffered,webhook]", + }, { + name: "default webhook with truncating", + options: func() *AuditOptions { + o := NewAuditOptions() + o.WebhookOptions.ConfigFile = webhookConfig + o.WebhookOptions.TruncateOptions.Enabled = true + return o + }, + expected: "truncate>", }} for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { @@ -148,6 +157,25 @@ func TestAuditInvalidOptions(t *testing.T) { o.WebhookOptions.BatchOptions.BatchConfig.ThrottleQPS = -1 return o }, + }, { + name: "invalid webhook truncate max event size", + options: func() *AuditOptions { + o := NewAuditOptions() + o.WebhookOptions.ConfigFile = "/audit" + o.WebhookOptions.TruncateOptions.Enabled = true + o.WebhookOptions.TruncateOptions.TruncateConfig.MaxEventSize = -1 + return o + }, + }, { + name: "invalid webhook truncate max batch size", + options: func() *AuditOptions { + o := NewAuditOptions() + o.WebhookOptions.ConfigFile = "/audit" + o.WebhookOptions.TruncateOptions.Enabled = true + o.WebhookOptions.TruncateOptions.TruncateConfig.MaxEventSize = 2 + o.WebhookOptions.TruncateOptions.TruncateConfig.MaxBatchSize = 1 + return o + }, }} for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { diff --git a/plugin/pkg/audit/buffered/buffered_test.go b/plugin/pkg/audit/buffered/buffered_test.go index b70996468..c01258e04 100644 --- a/plugin/pkg/audit/buffered/buffered_test.go +++ b/plugin/pkg/audit/buffered/buffered_test.go @@ -24,7 +24,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" auditinternal "k8s.io/apiserver/pkg/apis/audit" - "k8s.io/apiserver/pkg/audit" + "k8s.io/apiserver/plugin/pkg/audit/fake" ) var ( @@ -83,7 +83,7 @@ func TestBufferedBackendCollectEvents(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { t.Parallel() - backend := NewBackend(&fakeBackend{}, config).(*bufferedBackend) + backend := NewBackend(&fake.Backend{}, config).(*bufferedBackend) backend.ProcessEvents(newEvents(tc.numEvents)...) batch := backend.collectEvents(tc.timer, tc.stopCh) @@ -96,7 +96,7 @@ func TestBufferedBackendCollectEvents(t *testing.T) { func TestBufferedBackendProcessEventsAfterStop(t *testing.T) { t.Parallel() - backend := NewBackend(&fakeBackend{}, NewDefaultBatchConfig()).(*bufferedBackend) + backend := NewBackend(&fake.Backend{}, NewDefaultBatchConfig()).(*bufferedBackend) backend.Run(closedStopCh) backend.Shutdown() @@ -111,7 +111,7 @@ func TestBufferedBackendProcessEventsBufferFull(t *testing.T) { config := NewDefaultBatchConfig() config.BufferSize = 1 - backend := NewBackend(&fakeBackend{}, config).(*bufferedBackend) + backend := NewBackend(&fake.Backend{}, config).(*bufferedBackend) backend.ProcessEvents(newEvents(2)...) @@ -123,8 +123,8 @@ func TestBufferedBackendShutdownWaitsForDelegatedCalls(t *testing.T) { delegatedCallStartCh := make(chan struct{}) delegatedCallEndCh := make(chan struct{}) - delegateBackend := &fakeBackend{ - onRequest: func(_ []*auditinternal.Event) { + delegateBackend := &fake.Backend{ + OnRequest: func(_ []*auditinternal.Event) { close(delegatedCallStartCh) <-delegatedCallEndCh }, @@ -159,23 +159,3 @@ func TestBufferedBackendShutdownWaitsForDelegatedCalls(t *testing.T) { close(delegatedCallEndCh) <-shutdownEndCh } - -type fakeBackend struct { - onRequest func(events []*auditinternal.Event) -} - -var _ audit.Backend = &fakeBackend{} - -func (b *fakeBackend) Run(stopCh <-chan struct{}) error { - return nil -} - -func (b *fakeBackend) Shutdown() { - return -} - -func (b *fakeBackend) ProcessEvents(ev ...*auditinternal.Event) { - if b.onRequest != nil { - b.onRequest(ev) - } -} diff --git a/plugin/pkg/audit/fake/doc.go b/plugin/pkg/audit/fake/doc.go new file mode 100644 index 000000000..273947612 --- /dev/null +++ b/plugin/pkg/audit/fake/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package fake provides a fake audit.Backend interface implementation for testing. +package fake // import "k8s.io/apiserver/plugin/pkg/audit/fake" diff --git a/plugin/pkg/audit/fake/fake.go b/plugin/pkg/audit/fake/fake.go new file mode 100644 index 000000000..4b8fa3c7c --- /dev/null +++ b/plugin/pkg/audit/fake/fake.go @@ -0,0 +1,46 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fake + +import ( + auditinternal "k8s.io/apiserver/pkg/apis/audit" + "k8s.io/apiserver/pkg/audit" +) + +var _ audit.Backend = &Backend{} + +// Backend is a fake audit backend for testing purposes. +type Backend struct { + OnRequest func(events []*auditinternal.Event) +} + +// Run does nothing. +func (b *Backend) Run(stopCh <-chan struct{}) error { + return nil +} + +// Shutdown does nothing. +func (b *Backend) Shutdown() { + return +} + +// ProcessEvents calls a callback on a batch, if present. +func (b *Backend) ProcessEvents(ev ...*auditinternal.Event) { + if b.OnRequest != nil { + b.OnRequest(ev) + } +} diff --git a/plugin/pkg/audit/truncate/doc.go b/plugin/pkg/audit/truncate/doc.go new file mode 100644 index 000000000..9392ac314 --- /dev/null +++ b/plugin/pkg/audit/truncate/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package truncate provides an implementation for the audit.Backend interface +// that truncates audit events and sends them to the delegate audit.Backend. +package truncate // import "k8s.io/apiserver/plugin/pkg/audit/truncate" diff --git a/plugin/pkg/audit/truncate/truncate.go b/plugin/pkg/audit/truncate/truncate.go new file mode 100644 index 000000000..79e6876ce --- /dev/null +++ b/plugin/pkg/audit/truncate/truncate.go @@ -0,0 +1,158 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package truncate + +import ( + "fmt" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + auditinternal "k8s.io/apiserver/pkg/apis/audit" + "k8s.io/apiserver/pkg/audit" +) + +const ( + // PluginName is the name reported in error metrics. + PluginName = "truncate" + + // annotationKey defines the name of the annotation used to indicate truncation. + annotationKey = "audit.k8s.io/truncated" + // annotationValue defines the value of the annotation used to indicate truncation. + annotationValue = "true" +) + +// Config represents truncating backend configuration. +type Config struct { + // MaxEventSize defines max allowed size of the event. If the event is larger, + // truncating will be performed. + MaxEventSize int64 + + // MaxBatchSize defined max allowed size of the batch of events, passed to the backend. + // If the total size of the batch is larger than this number, batch will be split. Actual + // size of the serialized request might be slightly higher, on the order of hundreds of bytes. + MaxBatchSize int64 +} + +type backend struct { + // The delegate backend that actually exports events. + delegateBackend audit.Backend + + // Configuration used for truncation. + c Config + + // Encoder used to calculate audit event sizes. + e runtime.Encoder +} + +var _ audit.Backend = &backend{} + +// NewBackend returns a new truncating backend, using configuration passed in the parameters. +func NewBackend(delegateBackend audit.Backend, config Config, groupVersion schema.GroupVersion) audit.Backend { + return &backend{ + delegateBackend: delegateBackend, + c: config, + e: audit.Codecs.LegacyCodec(groupVersion), + } +} + +func (b *backend) ProcessEvents(events ...*auditinternal.Event) { + var errors []error + var impacted []*auditinternal.Event + var batch []*auditinternal.Event + var batchSize int64 + for _, event := range events { + size, err := b.calcSize(event) + // If event was correctly serialized, but the size is more than allowed + // and it makes sense to do trimming, i.e. there's a request and/or + // response present, try to strip away request and response. + if err == nil && size > b.c.MaxEventSize && event.Level.GreaterOrEqual(auditinternal.LevelRequest) { + event = truncate(event) + size, err = b.calcSize(event) + } + if err != nil { + errors = append(errors, err) + impacted = append(impacted, event) + continue + } + if size > b.c.MaxEventSize { + errors = append(errors, fmt.Errorf("event is too large even after truncating")) + impacted = append(impacted, event) + continue + } + + if len(batch) > 0 && batchSize+size > b.c.MaxBatchSize { + b.delegateBackend.ProcessEvents(batch...) + batch = []*auditinternal.Event{} + batchSize = 0 + } + + batchSize += size + batch = append(batch, event) + } + + if len(batch) > 0 { + b.delegateBackend.ProcessEvents(batch...) + } + + if len(impacted) > 0 { + audit.HandlePluginError(PluginName, utilerrors.NewAggregate(errors), impacted...) + } +} + +// truncate removed request and response objects from the audit events, +// to try and keep at least metadata. +func truncate(e *auditinternal.Event) *auditinternal.Event { + // Make a shallow copy to avoid copying response/request objects. + newEvent := &auditinternal.Event{} + *newEvent = *e + + newEvent.RequestObject = nil + newEvent.ResponseObject = nil + audit.LogAnnotation(newEvent, annotationKey, annotationValue) + return newEvent +} + +func (b *backend) Run(stopCh <-chan struct{}) error { + // Nothing to do here + return nil +} + +func (b *backend) Shutdown() { + // Nothing to do here +} + +func (b *backend) calcSize(e *auditinternal.Event) (int64, error) { + s := &sizer{} + if err := b.e.Encode(e, s); err != nil { + return 0, err + } + return s.Size, nil +} + +func (b *backend) String() string { + return fmt.Sprintf("%s<%s>", PluginName, b.delegateBackend) +} + +type sizer struct { + Size int64 +} + +func (s *sizer) Write(p []byte) (n int, err error) { + s.Size += int64(len(p)) + return len(p), nil +} diff --git a/plugin/pkg/audit/truncate/truncate_test.go b/plugin/pkg/audit/truncate/truncate_test.go new file mode 100644 index 000000000..9b0d5c142 --- /dev/null +++ b/plugin/pkg/audit/truncate/truncate_test.go @@ -0,0 +1,141 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package truncate + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/require" + + "k8s.io/apimachinery/pkg/runtime" + auditinternal "k8s.io/apiserver/pkg/apis/audit" + auditv1beta1 "k8s.io/apiserver/pkg/apis/audit/v1beta1" + "k8s.io/apiserver/plugin/pkg/audit/fake" + // Importing just for the schema definitions. + _ "k8s.io/apiserver/plugin/pkg/audit/webhook" +) + +var ( + defaultConfig = Config{ + MaxBatchSize: 4 * 1024 * 1024, + MaxEventSize: 100 * 1024, + } +) + +func TestTruncatingEvents(t *testing.T) { + testCases := []struct { + desc string + event *auditinternal.Event + wantDropped bool + wantTruncated bool + }{ + { + desc: "Empty event should not be truncated", + event: &auditinternal.Event{}, + }, + { + desc: "Event with too large body should be truncated", + event: &auditinternal.Event{ + Level: auditinternal.LevelRequest, + RequestObject: &runtime.Unknown{ + Raw: []byte("\"" + strings.Repeat("A", int(defaultConfig.MaxEventSize)) + "\""), + }, + }, + wantTruncated: true, + }, + { + desc: "Event with too large metadata should be dropped", + event: &auditinternal.Event{ + Annotations: map[string]string{ + "key": strings.Repeat("A", int(defaultConfig.MaxEventSize)), + }, + }, + wantDropped: true, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + var event *auditinternal.Event + + fb := &fake.Backend{ + OnRequest: func(events []*auditinternal.Event) { + require.Equal(t, 1, len(events), "Expected single event in batch") + event = events[0] + }, + } + b := NewBackend(fb, defaultConfig, auditv1beta1.SchemeGroupVersion) + b.ProcessEvents(tc.event) + + require.Equal(t, !tc.wantDropped, event != nil, "Incorrect event presence") + if tc.wantTruncated { + require.Equal(t, annotationValue, event.Annotations[annotationKey], "Annotation should be present") + require.Nil(t, event.RequestObject, "After truncation request should be nil") + require.Nil(t, event.ResponseObject, "After truncation response should be nil") + } + }) + } +} + +func TestSplittingBatches(t *testing.T) { + testCases := []struct { + desc string + config Config + events []*auditinternal.Event + wantBatchCount int + }{ + { + desc: "Events fitting in one batch should not be split", + config: defaultConfig, + events: []*auditinternal.Event{{}}, + wantBatchCount: 1, + }, + { + desc: "Events not fitting in one batch should be split", + config: Config{ + MaxEventSize: defaultConfig.MaxEventSize, + MaxBatchSize: 1, + }, + events: []*auditinternal.Event{ + {Annotations: map[string]string{"key": strings.Repeat("A", int(50))}}, + {Annotations: map[string]string{"key": strings.Repeat("A", int(50))}}, + }, + wantBatchCount: 2, + }, + } + for _, tc := range testCases { + tc := tc + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + var gotBatchCount int + fb := &fake.Backend{ + OnRequest: func(events []*auditinternal.Event) { + gotBatchCount++ + }, + } + b := NewBackend(fb, tc.config, auditv1beta1.SchemeGroupVersion) + b.ProcessEvents(tc.events...) + + require.Equal(t, tc.wantBatchCount, gotBatchCount) + }) + } +}