From f3b69c3f89ac8c724f9f73bee1dcd0e69da2c1be Mon Sep 17 00:00:00 2001 From: Patrick Barker Date: Thu, 18 Oct 2018 21:34:02 -0500 Subject: [PATCH] adds dynamic audit plugins Kubernetes-commit: 8eb2150689159bd011aec189cf77e5b15fbcb22b --- pkg/audit/event/attributes.go | 147 ++++++++ pkg/audit/event/attributes_test.go | 96 +++++ pkg/audit/policy/dynamic.go | 39 ++ pkg/audit/policy/dynamic_test.go | 81 +++++ pkg/audit/policy/enforce.go | 53 +++ pkg/audit/policy/enforce_test.go | 146 ++++++++ pkg/audit/policy/util.go | 68 ++++ pkg/audit/policy/util_test.go | 81 +++++ pkg/audit/util/conversion.go | 43 +++ pkg/audit/util/conversion_test.go | 88 +++++ plugin/pkg/audit/dynamic/defaults.go | 46 +++ plugin/pkg/audit/dynamic/dynamic.go | 335 ++++++++++++++++++ plugin/pkg/audit/dynamic/dynamic_test.go | 275 ++++++++++++++ plugin/pkg/audit/dynamic/enforced/enforced.go | 90 +++++ .../audit/dynamic/enforced/enforced_test.go | 117 ++++++ plugin/pkg/audit/dynamic/factory.go | 91 +++++ plugin/pkg/audit/dynamic/factory_test.go | 146 ++++++++ plugin/pkg/audit/webhook/webhook.go | 22 +- 18 files changed, 1960 insertions(+), 4 deletions(-) create mode 100644 pkg/audit/event/attributes.go create mode 100644 pkg/audit/event/attributes_test.go create mode 100644 pkg/audit/policy/dynamic.go create mode 100644 pkg/audit/policy/dynamic_test.go create mode 100644 pkg/audit/policy/enforce.go create mode 100644 pkg/audit/policy/enforce_test.go create mode 100644 pkg/audit/policy/util.go create mode 100644 pkg/audit/policy/util_test.go create mode 100644 pkg/audit/util/conversion.go create mode 100644 pkg/audit/util/conversion_test.go create mode 100644 plugin/pkg/audit/dynamic/defaults.go create mode 100644 plugin/pkg/audit/dynamic/dynamic.go create mode 100644 plugin/pkg/audit/dynamic/dynamic_test.go create mode 100644 plugin/pkg/audit/dynamic/enforced/enforced.go create mode 100644 plugin/pkg/audit/dynamic/enforced/enforced_test.go create mode 100644 plugin/pkg/audit/dynamic/factory.go create mode 100644 plugin/pkg/audit/dynamic/factory_test.go diff --git a/pkg/audit/event/attributes.go b/pkg/audit/event/attributes.go new file mode 100644 index 000000000..576b8db84 --- /dev/null +++ b/pkg/audit/event/attributes.go @@ -0,0 +1,147 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package event + +import ( + "fmt" + "net/url" + + "k8s.io/apiserver/pkg/apis/audit" + authuser "k8s.io/apiserver/pkg/authentication/user" + "k8s.io/apiserver/pkg/authorization/authorizer" +) + +var _ authorizer.Attributes = &attributes{} + +// attributes implements the authorizer attributes interface +// with event data. This is used for enforced audit backends +type attributes struct { + event *audit.Event + path string +} + +// NewAttributes returns a new attributes struct and parsed request uri +// if needed +func NewAttributes(event *audit.Event) (authorizer.Attributes, error) { + a := attributes{ + event: event, + } + if event.ObjectRef == nil { + u, err := url.ParseRequestURI(a.event.RequestURI) + if err != nil { + return nil, fmt.Errorf("could not parse url: %v", err) + } + a.path = u.Path + } + return &a, nil +} + +// GetUser returns the user. This is only used for checking audit policy, +// and the audit policy user check is based off the original user, +// not the impersonated user. +func (a *attributes) GetUser() authuser.Info { + return user(a.event.User) +} + +// GetVerb returns the verb +func (a *attributes) GetVerb() string { + return a.event.Verb +} + +// IsReadOnly determines if the verb is a read only action +func (a *attributes) IsReadOnly() bool { + return a.event.Verb == "get" || a.event.Verb == "list" || a.event.Verb == "watch" +} + +// GetNamespace returns the object namespace if present +func (a *attributes) GetNamespace() string { + if a.event.ObjectRef == nil { + return "" + } + return a.event.ObjectRef.Namespace +} + +// GetResource returns the object resource if present +func (a *attributes) GetResource() string { + if a.event.ObjectRef == nil { + return "" + } + return a.event.ObjectRef.Resource +} + +// GetSubresource returns the object subresource if present +func (a *attributes) GetSubresource() string { + if a.event.ObjectRef == nil { + return "" + } + return a.event.ObjectRef.Subresource +} + +// GetName returns the object name if present +func (a *attributes) GetName() string { + if a.event.ObjectRef == nil { + return "" + } + return a.event.ObjectRef.Name +} + +// GetAPIGroup returns the object api group if present +func (a *attributes) GetAPIGroup() string { + if a.event.ObjectRef == nil { + return "" + } + return a.event.ObjectRef.APIGroup +} + +// GetAPIVersion returns the object api version if present +func (a *attributes) GetAPIVersion() string { + if a.event.ObjectRef == nil { + return "" + } + return a.event.ObjectRef.APIVersion +} + +// IsResourceRequest determines if the request was acted on a resource +func (a *attributes) IsResourceRequest() bool { + return a.event.ObjectRef != nil +} + +// GetPath returns the path uri accessed +func (a *attributes) GetPath() string { + return a.path +} + +// user represents the event user +type user audit.UserInfo + +// GetName returns the user name +func (u user) GetName() string { return u.Username } + +// GetUID returns the user uid +func (u user) GetUID() string { return u.UID } + +// GetGroups returns the user groups +func (u user) GetGroups() []string { return u.Groups } + +// GetExtra returns the user extra data +func (u user) GetExtra() map[string][]string { + m := map[string][]string{} + for k, v := range u.Extra { + m[k] = []string(v) + } + return m +} diff --git a/pkg/audit/event/attributes_test.go b/pkg/audit/event/attributes_test.go new file mode 100644 index 000000000..e97dbf45b --- /dev/null +++ b/pkg/audit/event/attributes_test.go @@ -0,0 +1,96 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package event + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "k8s.io/apiserver/pkg/apis/audit" +) + +func TestAttributes(t *testing.T) { + for _, tc := range []struct { + desc string + ev *audit.Event + path string + isReadOnly bool + resourceName string + shouldErr bool + }{ + { + desc: "has resources", + ev: &audit.Event{ + Verb: "get", + ObjectRef: &audit.ObjectReference{ + Resource: "pod", + Name: "mypod", + Namespace: "test", + }, + RequestURI: "/api/v1/namespaces/test/pods", + }, + path: "", + isReadOnly: true, + resourceName: "mypod", + shouldErr: false, + }, + { + desc: "no resources", + ev: &audit.Event{ + Verb: "create", + RequestURI: "/api/v1/namespaces/test/pods", + }, + path: "/api/v1/namespaces/test/pods", + isReadOnly: false, + resourceName: "", + shouldErr: false, + }, + { + desc: "no path or resource", + ev: &audit.Event{ + Verb: "create", + }, + path: "", + isReadOnly: false, + resourceName: "", + shouldErr: true, + }, + { + desc: "invalid path", + ev: &audit.Event{ + Verb: "create", + }, + path: "a/bad/path", + isReadOnly: false, + resourceName: "", + shouldErr: true, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + attr, err := NewAttributes(tc.ev) + if tc.shouldErr { + require.Error(t, err) + return + } + require.NoError(t, err) + require.Equal(t, tc.path, attr.GetPath()) + require.Equal(t, tc.isReadOnly, attr.IsReadOnly()) + require.Equal(t, tc.resourceName, attr.GetName()) + }) + } +} diff --git a/pkg/audit/policy/dynamic.go b/pkg/audit/policy/dynamic.go new file mode 100644 index 000000000..72add7a46 --- /dev/null +++ b/pkg/audit/policy/dynamic.go @@ -0,0 +1,39 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package policy + +import ( + "k8s.io/api/auditregistration/v1alpha1" + "k8s.io/apiserver/pkg/apis/audit" +) + +// ConvertDynamicPolicyToInternal constructs an internal policy type from a +// v1alpha1 dynamic type +func ConvertDynamicPolicyToInternal(p *v1alpha1.Policy) *audit.Policy { + stages := make([]audit.Stage, len(p.Stages)) + for i, stage := range p.Stages { + stages[i] = audit.Stage(stage) + } + return &audit.Policy{ + Rules: []audit.PolicyRule{ + { + Level: audit.Level(p.Level), + }, + }, + OmitStages: InvertStages(stages), + } +} diff --git a/pkg/audit/policy/dynamic_test.go b/pkg/audit/policy/dynamic_test.go new file mode 100644 index 000000000..369716001 --- /dev/null +++ b/pkg/audit/policy/dynamic_test.go @@ -0,0 +1,81 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package policy + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "k8s.io/api/auditregistration/v1alpha1" + "k8s.io/apiserver/pkg/apis/audit" +) + +func TestConvertDynamicPolicyToInternal(t *testing.T) { + for _, test := range []struct { + desc string + dynamic *v1alpha1.Policy + internal *audit.Policy + }{ + { + desc: "should convert full", + dynamic: &v1alpha1.Policy{ + Level: v1alpha1.LevelMetadata, + Stages: []v1alpha1.Stage{ + v1alpha1.StageResponseComplete, + }, + }, + internal: &audit.Policy{ + Rules: []audit.PolicyRule{ + { + Level: audit.LevelMetadata, + }, + }, + OmitStages: []audit.Stage{ + audit.StageRequestReceived, + audit.StageResponseStarted, + audit.StagePanic, + }, + }, + }, + { + desc: "should convert missing stages", + dynamic: &v1alpha1.Policy{ + Level: v1alpha1.LevelMetadata, + }, + internal: &audit.Policy{ + Rules: []audit.PolicyRule{ + { + Level: audit.LevelMetadata, + }, + }, + OmitStages: []audit.Stage{ + audit.StageRequestReceived, + audit.StageResponseStarted, + audit.StageResponseComplete, + audit.StagePanic, + }, + }, + }, + } { + t.Run(test.desc, func(t *testing.T) { + d := ConvertDynamicPolicyToInternal(test.dynamic) + require.ElementsMatch(t, test.internal.OmitStages, d.OmitStages) + require.Equal(t, test.internal.Rules, d.Rules) + }) + } +} diff --git a/pkg/audit/policy/enforce.go b/pkg/audit/policy/enforce.go new file mode 100644 index 000000000..e2b107b9f --- /dev/null +++ b/pkg/audit/policy/enforce.go @@ -0,0 +1,53 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package policy + +import ( + "fmt" + + "k8s.io/apiserver/pkg/apis/audit" +) + +// EnforcePolicy drops any part of the event that doesn't conform to a policy level +// or omitStages and sets the event level accordingly +func EnforcePolicy(event *audit.Event, level audit.Level, omitStages []audit.Stage) (*audit.Event, error) { + for _, stage := range omitStages { + if event.Stage == stage { + return nil, nil + } + } + return enforceLevel(event, level) +} + +func enforceLevel(event *audit.Event, level audit.Level) (*audit.Event, error) { + switch level { + case audit.LevelMetadata: + event.Level = audit.LevelMetadata + event.ResponseObject = nil + event.RequestObject = nil + case audit.LevelRequest: + event.Level = audit.LevelRequest + event.ResponseObject = nil + case audit.LevelRequestResponse: + event.Level = audit.LevelRequestResponse + case audit.LevelNone: + return nil, nil + default: + return nil, fmt.Errorf("level unknown: %s", level) + } + return event, nil +} diff --git a/pkg/audit/policy/enforce_test.go b/pkg/audit/policy/enforce_test.go new file mode 100644 index 000000000..b7fd775a1 --- /dev/null +++ b/pkg/audit/policy/enforce_test.go @@ -0,0 +1,146 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package policy + +import ( + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "k8s.io/apimachinery/pkg/api/apitesting/fuzzer" + "k8s.io/apimachinery/pkg/runtime" + runtimeserializer "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/apis/audit" + auditfuzz "k8s.io/apiserver/pkg/apis/audit/fuzzer" +) + +func TestEnforcePolicy(t *testing.T) { + scheme := runtime.NewScheme() + audit.SchemeBuilder.AddToScheme(scheme) + codecs := runtimeserializer.NewCodecFactory(scheme) + rs := rand.NewSource(time.Now().UnixNano()) + objectFuzzer := fuzzer.FuzzerFor(auditfuzz.Funcs, rs, codecs) + + for _, tc := range []struct { + name string + level audit.Level + omitStages []audit.Stage + }{ + { + name: "level metadata", + level: audit.LevelMetadata, + }, + { + name: "level request", + level: audit.LevelRequest, + }, + { + name: "level requestresponse", + level: audit.LevelRequestResponse, + }, + { + name: "level none", + level: audit.LevelNone, + }, + { + name: "level unknown", + level: audit.Level("unknown"), + }, + { + name: "stage valid", + level: audit.LevelRequest, + omitStages: []audit.Stage{audit.StageRequestReceived}, + }, + { + name: "stage unknown", + level: audit.LevelRequest, + omitStages: []audit.Stage{"unknown"}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + events := make([]audit.Event, 20) + omitSet := sets.NewString(ConvertStagesToStrings(tc.omitStages)...) + for i := range events { + e := &events[i] + objectFuzzer.Fuzz(e) + ev, err := EnforcePolicy(e, tc.level, tc.omitStages) + if omitSet.Has(string(e.Stage)) { + require.Nil(t, err) + require.Nil(t, ev) + return + } + switch tc.level { + case audit.LevelNone: + require.Nil(t, ev) + case audit.LevelMetadata: + expected := &audit.Event{ + TypeMeta: e.TypeMeta, + Level: tc.level, + AuditID: e.AuditID, + Stage: e.Stage, + RequestURI: e.RequestURI, + Verb: e.Verb, + User: e.User, + ImpersonatedUser: e.ImpersonatedUser, + SourceIPs: e.SourceIPs, + UserAgent: e.UserAgent, + ObjectRef: e.ObjectRef, + ResponseStatus: e.ResponseStatus, + RequestReceivedTimestamp: e.RequestReceivedTimestamp, + StageTimestamp: e.StageTimestamp, + Annotations: e.Annotations, + RequestObject: nil, + ResponseObject: nil, + } + require.Equal(t, expected, ev) + case audit.LevelRequest: + expected := &audit.Event{ + TypeMeta: e.TypeMeta, + Level: tc.level, + AuditID: e.AuditID, + Stage: e.Stage, + RequestURI: e.RequestURI, + Verb: e.Verb, + User: e.User, + ImpersonatedUser: e.ImpersonatedUser, + SourceIPs: e.SourceIPs, + UserAgent: e.UserAgent, + ObjectRef: e.ObjectRef, + ResponseStatus: e.ResponseStatus, + RequestReceivedTimestamp: e.RequestReceivedTimestamp, + StageTimestamp: e.StageTimestamp, + Annotations: e.Annotations, + RequestObject: e.RequestObject, + ResponseObject: nil, + } + require.Equal(t, expected, ev) + case audit.LevelRequestResponse: + expected := e.DeepCopy() + expected.Level = tc.level + require.Equal(t, expected, ev) + default: + require.NotNil(t, err) + return + } + require.Nil(t, err) + } + }) + } +} diff --git a/pkg/audit/policy/util.go b/pkg/audit/policy/util.go new file mode 100644 index 000000000..29be91230 --- /dev/null +++ b/pkg/audit/policy/util.go @@ -0,0 +1,68 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package policy + +import ( + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/apis/audit" +) + +// AllStages returns all possible stages +func AllStages() sets.String { + return sets.NewString( + audit.StageRequestReceived, + audit.StageResponseStarted, + audit.StageResponseComplete, + audit.StagePanic, + ) +} + +// AllLevels returns all possible levels +func AllLevels() sets.String { + return sets.NewString( + string(audit.LevelNone), + string(audit.LevelMetadata), + string(audit.LevelRequest), + string(audit.LevelRequestResponse), + ) +} + +// InvertStages subtracts the given array of stages from all stages +func InvertStages(stages []audit.Stage) []audit.Stage { + s := ConvertStagesToStrings(stages) + a := AllStages() + a.Delete(s...) + return ConvertStringSetToStages(a) +} + +// ConvertStagesToStrings converts an array of stages to a string array +func ConvertStagesToStrings(stages []audit.Stage) []string { + s := make([]string, len(stages)) + for i, stage := range stages { + s[i] = string(stage) + } + return s +} + +// ConvertStringSetToStages converts a string set to an array of stages +func ConvertStringSetToStages(set sets.String) []audit.Stage { + stages := make([]audit.Stage, len(set)) + for i, stage := range set.List() { + stages[i] = audit.Stage(stage) + } + return stages +} diff --git a/pkg/audit/policy/util_test.go b/pkg/audit/policy/util_test.go new file mode 100644 index 000000000..3181b03a6 --- /dev/null +++ b/pkg/audit/policy/util_test.go @@ -0,0 +1,81 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package policy + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "k8s.io/apiserver/pkg/apis/audit" +) + +func TestInvertStages(t *testing.T) { + for _, test := range []struct { + desc string + stages []audit.Stage + expectedStages []audit.Stage + }{ + { + desc: "should remove one", + stages: []audit.Stage{ + audit.StageResponseStarted, + }, + expectedStages: []audit.Stage{ + audit.StageRequestReceived, + audit.StageResponseComplete, + audit.StagePanic, + }, + }, + { + desc: "should remove both", + stages: []audit.Stage{ + audit.StageResponseStarted, + audit.StageRequestReceived, + }, + expectedStages: []audit.Stage{ + audit.StageResponseComplete, + audit.StagePanic, + }, + }, + { + desc: "should remove none", + stages: []audit.Stage{}, + expectedStages: []audit.Stage{ + audit.StageResponseComplete, + audit.StageResponseStarted, + audit.StageRequestReceived, + audit.StagePanic, + }, + }, + { + desc: "should remove all", + stages: []audit.Stage{ + audit.StageResponseComplete, + audit.StageResponseStarted, + audit.StageRequestReceived, + audit.StagePanic, + }, + expectedStages: []audit.Stage{}, + }, + } { + t.Run(test.desc, func(t *testing.T) { + e := InvertStages(test.stages) + require.ElementsMatch(t, e, test.expectedStages) + }) + } +} diff --git a/pkg/audit/util/conversion.go b/pkg/audit/util/conversion.go new file mode 100644 index 000000000..6b1f35c43 --- /dev/null +++ b/pkg/audit/util/conversion.go @@ -0,0 +1,43 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "k8s.io/api/auditregistration/v1alpha1" + "k8s.io/apiserver/pkg/util/webhook" +) + +// HookClientConfigForSink constructs a webhook.ClientConfig using a v1alpha1.AuditSink API object. +// webhook.ClientConfig is used to create a HookClient and the purpose of the config struct is to +// share that with other packages that need to create a HookClient. +func HookClientConfigForSink(a *v1alpha1.AuditSink) webhook.ClientConfig { + c := a.Spec.Webhook.ClientConfig + ret := webhook.ClientConfig{Name: a.Name, CABundle: c.CABundle} + if c.URL != nil { + ret.URL = *c.URL + } + if c.Service != nil { + ret.Service = &webhook.ClientConfigService{ + Name: c.Service.Name, + Namespace: c.Service.Namespace, + } + if c.Service.Path != nil { + ret.Service.Path = *c.Service.Path + } + } + return ret +} diff --git a/pkg/audit/util/conversion_test.go b/pkg/audit/util/conversion_test.go new file mode 100644 index 000000000..4cd23c13d --- /dev/null +++ b/pkg/audit/util/conversion_test.go @@ -0,0 +1,88 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "testing" + + "github.com/stretchr/testify/require" + + auditregv1alpha1 "k8s.io/api/auditregistration/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apiserver/pkg/util/webhook" +) + +func TestHookClientConfigForSink(t *testing.T) { + testURL := "http://localhost" + path := "/path" + for _, tc := range []struct { + desc string + sink *auditregv1alpha1.AuditSink + clientConfig webhook.ClientConfig + }{ + { + desc: "build full", + sink: &auditregv1alpha1.AuditSink{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Spec: auditregv1alpha1.AuditSinkSpec{ + Webhook: auditregv1alpha1.Webhook{ + ClientConfig: auditregv1alpha1.WebhookClientConfig{ + URL: &testURL, + Service: &auditregv1alpha1.ServiceReference{ + Name: "test", + Path: &path, + Namespace: "test", + }, + }, + }, + }, + }, + clientConfig: webhook.ClientConfig{ + Name: "test", + URL: testURL, + Service: &webhook.ClientConfigService{ + Name: "test", + Namespace: "test", + Path: path, + }, + }, + }, + { + desc: "build empty client config", + sink: &auditregv1alpha1.AuditSink{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Spec: auditregv1alpha1.AuditSinkSpec{ + Webhook: auditregv1alpha1.Webhook{ + ClientConfig: auditregv1alpha1.WebhookClientConfig{}, + }, + }, + }, + clientConfig: webhook.ClientConfig{ + Name: "test", + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + ret := HookClientConfigForSink(tc.sink) + require.Equal(t, tc.clientConfig, ret) + }) + } +} diff --git a/plugin/pkg/audit/dynamic/defaults.go b/plugin/pkg/audit/dynamic/defaults.go new file mode 100644 index 000000000..f442954b5 --- /dev/null +++ b/plugin/pkg/audit/dynamic/defaults.go @@ -0,0 +1,46 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dynamic + +import ( + "time" + + bufferedplugin "k8s.io/apiserver/plugin/pkg/audit/buffered" +) + +const ( + // Default configuration values for ModeBatch when applied to a dynamic plugin + defaultBatchBufferSize = 5000 // Buffer up to 5000 events before starting discarding. + defaultBatchMaxSize = 400 // Only send up to 400 events at a time. + defaultBatchMaxWait = 30 * time.Second // Send events at least twice a minute. + defaultBatchThrottleQPS = 10 // Limit the send rate by 10 QPS. + defaultBatchThrottleBurst = 15 // Allow up to 15 QPS burst. +) + +// NewDefaultWebhookBatchConfig returns new Batch Config objects populated by default values +// for dynamic webhooks +func NewDefaultWebhookBatchConfig() *bufferedplugin.BatchConfig { + return &bufferedplugin.BatchConfig{ + BufferSize: defaultBatchBufferSize, + MaxBatchSize: defaultBatchMaxSize, + MaxBatchWait: defaultBatchMaxWait, + ThrottleEnable: true, + ThrottleQPS: defaultBatchThrottleQPS, + ThrottleBurst: defaultBatchThrottleBurst, + AsyncDelegate: true, + } +} diff --git a/plugin/pkg/audit/dynamic/dynamic.go b/plugin/pkg/audit/dynamic/dynamic.go new file mode 100644 index 000000000..498737ab3 --- /dev/null +++ b/plugin/pkg/audit/dynamic/dynamic.go @@ -0,0 +1,335 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dynamic + +import ( + "fmt" + "reflect" + "strings" + "sync" + "sync/atomic" + + "github.com/golang/glog" + + auditregv1alpha1 "k8s.io/api/auditregistration/v1alpha1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + auditinternal "k8s.io/apiserver/pkg/apis/audit" + auditinstall "k8s.io/apiserver/pkg/apis/audit/install" + auditv1 "k8s.io/apiserver/pkg/apis/audit/v1" + "k8s.io/apiserver/pkg/audit" + webhook "k8s.io/apiserver/pkg/util/webhook" + bufferedplugin "k8s.io/apiserver/plugin/pkg/audit/buffered" + auditinformer "k8s.io/client-go/informers/auditregistration/v1alpha1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" +) + +// PluginName is the name reported in error metrics. +const PluginName = "dynamic" + +// Config holds the configuration for the dynamic backend +type Config struct { + // Informer for the audit sinks + Informer auditinformer.AuditSinkInformer + // EventConfig holds the configuration for event notifications about the AuditSink API objects + EventConfig EventConfig + // BufferedConfig is the runtime buffered configuration + BufferedConfig *bufferedplugin.BatchConfig + // WebhookConfig holds the configuration for outgoing webhooks + WebhookConfig WebhookConfig +} + +// WebhookConfig holds the configurations for outgoing webhooks +type WebhookConfig struct { + // AuthInfoResolverWrapper provides the webhook authentication for in-cluster endpoints + AuthInfoResolverWrapper webhook.AuthenticationInfoResolverWrapper + // ServiceResolver knows how to convert a webhook service reference into an actual location. + ServiceResolver webhook.ServiceResolver +} + +// EventConfig holds the configurations for sending event notifiations about AuditSink API objects +type EventConfig struct { + // Sink for emitting events + Sink record.EventSink + // Source holds the source information about the event emitter + Source corev1.EventSource +} + +// delegate represents a delegate backend that was created from an audit sink configuration +type delegate struct { + audit.Backend + configuration *auditregv1alpha1.AuditSink + stopChan chan struct{} +} + +// gracefulShutdown will gracefully shutdown the delegate +func (d *delegate) gracefulShutdown() { + close(d.stopChan) + d.Shutdown() +} + +// NewBackend returns a backend that dynamically updates its configuration +// based on a shared informer. +func NewBackend(c *Config) (audit.Backend, error) { + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(c.EventConfig.Sink) + + scheme := runtime.NewScheme() + err := auditregv1alpha1.AddToScheme(scheme) + if err != nil { + return nil, err + } + recorder := eventBroadcaster.NewRecorder(scheme, c.EventConfig.Source) + + if c.BufferedConfig == nil { + c.BufferedConfig = NewDefaultWebhookBatchConfig() + } + cm, err := webhook.NewClientManager(auditv1.SchemeGroupVersion, func(s *runtime.Scheme) error { + auditinstall.Install(s) + return nil + }) + if err != nil { + return nil, err + } + + // TODO: need a way of injecting authentication before beta + authInfoResolver, err := webhook.NewDefaultAuthenticationInfoResolver("") + if err != nil { + return nil, err + } + cm.SetAuthenticationInfoResolver(authInfoResolver) + cm.SetServiceResolver(c.WebhookConfig.ServiceResolver) + cm.SetAuthenticationInfoResolverWrapper(c.WebhookConfig.AuthInfoResolverWrapper) + + manager := &backend{ + config: c, + delegates: atomic.Value{}, + delegateUpdateMutex: sync.Mutex{}, + webhookClientManager: cm, + recorder: recorder, + } + manager.delegates.Store(syncedDelegates{}) + + c.Informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + manager.addSink(obj.(*auditregv1alpha1.AuditSink)) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + manager.updateSink(oldObj.(*auditregv1alpha1.AuditSink), newObj.(*auditregv1alpha1.AuditSink)) + }, + DeleteFunc: func(obj interface{}) { + sink, ok := obj.(*auditregv1alpha1.AuditSink) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + glog.V(2).Infof("Couldn't get object from tombstone %#v", obj) + return + } + sink, ok = tombstone.Obj.(*auditregv1alpha1.AuditSink) + if !ok { + glog.V(2).Infof("Tombstone contained object that is not an AuditSink: %#v", obj) + return + } + } + manager.deleteSink(sink) + }, + }) + + return manager, nil +} + +type backend struct { + // delegateUpdateMutex holds an update lock on the delegates + delegateUpdateMutex sync.Mutex + config *Config + delegates atomic.Value + webhookClientManager webhook.ClientManager + recorder record.EventRecorder +} + +type syncedDelegates map[types.UID]*delegate + +// Names returns the names of the delegate configurations +func (s syncedDelegates) Names() []string { + names := []string{} + for _, delegate := range s { + names = append(names, delegate.configuration.Name) + } + return names +} + +// ProcessEvents proccesses the given events per current delegate map +func (b *backend) ProcessEvents(events ...*auditinternal.Event) { + for _, d := range b.GetDelegates() { + d.ProcessEvents(events...) + } +} + +// Run starts a goroutine that propagates the shutdown signal, +// individual delegates are ran as they are created. +func (b *backend) Run(stopCh <-chan struct{}) error { + go func() { + <-stopCh + b.stopAllDelegates() + }() + return nil +} + +// stopAllDelegates closes the stopChan for every delegate to enable +// goroutines to terminate gracefully. This is a helper method to propagate +// the primary stopChan to the current delegate map. +func (b *backend) stopAllDelegates() { + b.delegateUpdateMutex.Lock() + for _, d := range b.GetDelegates() { + close(d.stopChan) + } +} + +// Shutdown calls the shutdown method on all delegates. The stopChan should +// be closed before this is called. +func (b *backend) Shutdown() { + for _, d := range b.GetDelegates() { + d.Shutdown() + } +} + +// GetDelegates retrieves current delegates in a safe manner +func (b *backend) GetDelegates() syncedDelegates { + return b.delegates.Load().(syncedDelegates) +} + +// copyDelegates returns a copied delegate map +func (b *backend) copyDelegates() syncedDelegates { + c := make(syncedDelegates) + for u, s := range b.GetDelegates() { + c[u] = s + } + return c +} + +// setDelegates sets the current delegates in a safe manner +func (b *backend) setDelegates(delegates syncedDelegates) { + b.delegates.Store(delegates) +} + +// addSink is called by the shared informer when a sink is added +func (b *backend) addSink(sink *auditregv1alpha1.AuditSink) { + b.delegateUpdateMutex.Lock() + defer b.delegateUpdateMutex.Unlock() + delegates := b.copyDelegates() + if _, ok := delegates[sink.UID]; ok { + glog.Errorf("Audit sink %q uid: %s already exists, could not readd", sink.Name, sink.UID) + return + } + d, err := b.createAndStartDelegate(sink) + if err != nil { + msg := fmt.Sprintf("Could not add audit sink %q: %v", sink.Name, err) + glog.Error(msg) + b.recorder.Event(sink, corev1.EventTypeWarning, "CreateFailed", msg) + return + } + delegates[sink.UID] = d + b.setDelegates(delegates) + glog.V(2).Infof("Added audit sink: %s", sink.Name) + glog.V(2).Infof("Current audit sinks: %v", delegates.Names()) +} + +// updateSink is called by the shared informer when a sink is updated. +// The new sink is only rebuilt on spec changes. The new sink must not have +// the same uid as the previous. The new sink will be started before the old +// one is shutdown so no events will be lost +func (b *backend) updateSink(oldSink, newSink *auditregv1alpha1.AuditSink) { + b.delegateUpdateMutex.Lock() + defer b.delegateUpdateMutex.Unlock() + delegates := b.copyDelegates() + oldDelegate, ok := delegates[oldSink.UID] + if !ok { + glog.Errorf("Could not update audit sink %q uid: %s, old sink does not exist", + oldSink.Name, oldSink.UID) + return + } + + // check if spec has changed + eq := reflect.DeepEqual(oldSink.Spec, newSink.Spec) + if eq { + delete(delegates, oldSink.UID) + delegates[newSink.UID] = oldDelegate + b.setDelegates(delegates) + } else { + d, err := b.createAndStartDelegate(newSink) + if err != nil { + msg := fmt.Sprintf("Could not update audit sink %q: %v", oldSink.Name, err) + glog.Error(msg) + b.recorder.Event(newSink, corev1.EventTypeWarning, "UpdateFailed", msg) + return + } + delete(delegates, oldSink.UID) + delegates[newSink.UID] = d + b.setDelegates(delegates) + oldDelegate.gracefulShutdown() + } + + glog.V(2).Infof("Updated audit sink: %s", newSink.Name) + glog.V(2).Infof("Current audit sinks: %v", delegates.Names()) +} + +// deleteSink is called by the shared informer when a sink is deleted +func (b *backend) deleteSink(sink *auditregv1alpha1.AuditSink) { + b.delegateUpdateMutex.Lock() + defer b.delegateUpdateMutex.Unlock() + delegates := b.copyDelegates() + delegate, ok := delegates[sink.UID] + if !ok { + glog.Errorf("Could not delete audit sink %q uid: %s, does not exist", sink.Name, sink.UID) + return + } + delete(delegates, sink.UID) + b.setDelegates(delegates) + delegate.gracefulShutdown() + glog.V(2).Infof("Deleted audit sink: %s", sink.Name) + glog.V(2).Infof("Current audit sinks: %v", delegates.Names()) +} + +// createAndStartDelegate will build a delegate from an audit sink configuration and run it +func (b *backend) createAndStartDelegate(sink *auditregv1alpha1.AuditSink) (*delegate, error) { + f := factory{ + config: b.config, + webhookClientManager: b.webhookClientManager, + sink: sink, + } + delegate, err := f.BuildDelegate() + if err != nil { + return nil, err + } + err = delegate.Run(delegate.stopChan) + if err != nil { + return nil, err + } + return delegate, nil +} + +// String returns a string representation of the backend +func (b *backend) String() string { + var delegateStrings []string + for _, delegate := range b.GetDelegates() { + delegateStrings = append(delegateStrings, fmt.Sprintf("%s", delegate)) + } + return fmt.Sprintf("%s[%s]", PluginName, strings.Join(delegateStrings, ",")) +} diff --git a/plugin/pkg/audit/dynamic/dynamic_test.go b/plugin/pkg/audit/dynamic/dynamic_test.go new file mode 100644 index 000000000..8b9c91407 --- /dev/null +++ b/plugin/pkg/audit/dynamic/dynamic_test.go @@ -0,0 +1,275 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dynamic + +import ( + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "reflect" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + + auditregv1alpha1 "k8s.io/api/auditregistration/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + auditinternal "k8s.io/apiserver/pkg/apis/audit" + auditv1 "k8s.io/apiserver/pkg/apis/audit/v1" + "k8s.io/apiserver/pkg/audit" + webhook "k8s.io/apiserver/pkg/util/webhook" + informers "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" +) + +func TestDynamic(t *testing.T) { + eventList1 := &atomic.Value{} + eventList1.Store(auditinternal.EventList{}) + eventList2 := &atomic.Value{} + eventList2.Store(auditinternal.EventList{}) + + // start test servers + server1 := httptest.NewServer(buildTestHandler(t, eventList1)) + defer server1.Close() + server2 := httptest.NewServer(buildTestHandler(t, eventList2)) + defer server2.Close() + + testPolicy := auditregv1alpha1.Policy{ + Level: auditregv1alpha1.LevelMetadata, + Stages: []auditregv1alpha1.Stage{ + auditregv1alpha1.StageResponseStarted, + }, + } + testEvent := auditinternal.Event{ + Level: auditinternal.LevelMetadata, + Stage: auditinternal.StageResponseStarted, + Verb: "get", + RequestURI: "/test/path", + } + testConfig1 := &auditregv1alpha1.AuditSink{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test1", + UID: types.UID("test1"), + }, + Spec: auditregv1alpha1.AuditSinkSpec{ + Policy: testPolicy, + Webhook: auditregv1alpha1.Webhook{ + ClientConfig: auditregv1alpha1.WebhookClientConfig{ + URL: &server1.URL, + }, + }, + }, + } + testConfig2 := &auditregv1alpha1.AuditSink{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test2", + UID: types.UID("test2"), + }, + Spec: auditregv1alpha1.AuditSinkSpec{ + Policy: testPolicy, + Webhook: auditregv1alpha1.Webhook{ + ClientConfig: auditregv1alpha1.WebhookClientConfig{ + URL: &server2.URL, + }, + }, + }, + } + + config, stopChan := defaultTestConfig() + config.BufferedConfig.MaxBatchWait = 10 * time.Millisecond + + b, err := NewBackend(config) + require.NoError(t, err) + d := b.(*backend) + err = b.Run(stopChan) + require.NoError(t, err) + + t.Run("find none", func(t *testing.T) { + require.Len(t, d.GetDelegates(), 0) + }) + + t.Run("find one", func(t *testing.T) { + d.addSink(testConfig1) + delegates := d.GetDelegates() + require.Len(t, delegates, 1) + require.Contains(t, delegates, types.UID("test1")) + require.Equal(t, testConfig1, delegates["test1"].configuration) + + // send event and check that it arrives + b.ProcessEvents(&testEvent) + err := checkForEvent(eventList1, testEvent) + require.NoError(t, err, "unable to find events sent to sink") + }) + + t.Run("find two", func(t *testing.T) { + eventList1.Store(auditinternal.EventList{}) + d.addSink(testConfig2) + delegates := d.GetDelegates() + require.Len(t, delegates, 2) + require.Contains(t, delegates, types.UID("test1")) + require.Contains(t, delegates, types.UID("test2")) + require.Equal(t, testConfig1, delegates["test1"].configuration) + require.Equal(t, testConfig2, delegates["test2"].configuration) + + // send event to both delegates and check that it arrives in both places + b.ProcessEvents(&testEvent) + err := checkForEvent(eventList1, testEvent) + require.NoError(t, err, "unable to find events sent to sink 1") + err = checkForEvent(eventList2, testEvent) + require.NoError(t, err, "unable to find events sent to sink 2") + }) + + t.Run("delete one", func(t *testing.T) { + eventList2.Store(auditinternal.EventList{}) + d.deleteSink(testConfig1) + delegates := d.GetDelegates() + require.Len(t, delegates, 1) + require.Contains(t, delegates, types.UID("test2")) + require.Equal(t, testConfig2, delegates["test2"].configuration) + + // send event and check that it arrives to remaining sink + b.ProcessEvents(&testEvent) + err := checkForEvent(eventList2, testEvent) + require.NoError(t, err, "unable to find events sent to sink") + }) + + t.Run("update one", func(t *testing.T) { + eventList1.Store(auditinternal.EventList{}) + oldConfig := *testConfig2 + testConfig2.Spec.Webhook.ClientConfig.URL = &server1.URL + testConfig2.UID = types.UID("test2.1") + d.updateSink(&oldConfig, testConfig2) + delegates := d.GetDelegates() + require.Len(t, delegates, 1) + require.Contains(t, delegates, types.UID("test2.1")) + require.Equal(t, testConfig2, delegates["test2.1"].configuration) + + // send event and check that it arrives to updated sink + b.ProcessEvents(&testEvent) + err := checkForEvent(eventList1, testEvent) + require.NoError(t, err, "unable to find events sent to sink") + }) + + t.Run("update meta only", func(t *testing.T) { + eventList1.Store(auditinternal.EventList{}) + oldConfig := *testConfig2 + testConfig2.UID = types.UID("test2.2") + testConfig2.Labels = map[string]string{"my": "label"} + d.updateSink(&oldConfig, testConfig2) + delegates := d.GetDelegates() + require.Len(t, delegates, 1) + require.Contains(t, delegates, types.UID("test2.2")) + + // send event and check that it arrives to same sink + b.ProcessEvents(&testEvent) + err := checkForEvent(eventList1, testEvent) + require.NoError(t, err, "unable to find events sent to sink") + }) + + t.Run("shutdown", func(t *testing.T) { + // if the stop signal is not propagated correctly the buffers will not + // close down gracefully, and the shutdown method will hang causing + // the test will timeout. + timeoutChan := make(chan struct{}) + successChan := make(chan struct{}) + go func() { + time.Sleep(1 * time.Second) + timeoutChan <- struct{}{} + }() + go func() { + close(stopChan) + d.Shutdown() + successChan <- struct{}{} + }() + for { + select { + case <-timeoutChan: + t.Error("shutdown timed out") + return + case <-successChan: + return + } + } + }) +} + +// checkForEvent will poll to check for an audit event in an atomic event list +func checkForEvent(a *atomic.Value, evSent auditinternal.Event) error { + return wait.Poll(100*time.Millisecond, 1*time.Second, func() (bool, error) { + el := a.Load().(auditinternal.EventList) + if len(el.Items) != 1 { + return false, nil + } + evFound := el.Items[0] + eq := reflect.DeepEqual(evSent, evFound) + if !eq { + return false, fmt.Errorf("event mismatch -- sent: %+v found: %+v", evSent, evFound) + } + return true, nil + }) +} + +// buildTestHandler returns a handler that will update the atomic value passed in +// with the event list it receives +func buildTestHandler(t *testing.T, a *atomic.Value) http.HandlerFunc { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := ioutil.ReadAll(r.Body) + if err != nil { + t.Fatalf("could not read request body: %v", err) + } + el := auditinternal.EventList{} + decoder := audit.Codecs.UniversalDecoder(auditv1.SchemeGroupVersion) + if err := runtime.DecodeInto(decoder, body, &el); err != nil { + t.Fatalf("failed decoding buf: %b, apiVersion: %s", body, auditv1.SchemeGroupVersion) + } + defer r.Body.Close() + a.Store(el) + w.WriteHeader(200) + }) +} + +// defaultTestConfig returns a Config object suitable for testing along with its +// associated stopChan +func defaultTestConfig() (*Config, chan struct{}) { + authWrapper := webhook.AuthenticationInfoResolverWrapper( + func(a webhook.AuthenticationInfoResolver) webhook.AuthenticationInfoResolver { return a }, + ) + client := fake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(client, 0) + stop := make(chan struct{}) + + eventSink := &v1core.EventSinkImpl{Interface: client.CoreV1().Events("")} + + informerFactory.Start(stop) + informerFactory.WaitForCacheSync(stop) + informer := informerFactory.Auditregistration().V1alpha1().AuditSinks() + return &Config{ + Informer: informer, + EventConfig: EventConfig{Sink: eventSink}, + BufferedConfig: NewDefaultWebhookBatchConfig(), + WebhookConfig: WebhookConfig{ + AuthInfoResolverWrapper: authWrapper, + ServiceResolver: webhook.NewDefaultServiceResolver(), + }, + }, stop +} diff --git a/plugin/pkg/audit/dynamic/enforced/enforced.go b/plugin/pkg/audit/dynamic/enforced/enforced.go new file mode 100644 index 000000000..1263a59ce --- /dev/null +++ b/plugin/pkg/audit/dynamic/enforced/enforced.go @@ -0,0 +1,90 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package enforced + +import ( + "fmt" + + auditinternal "k8s.io/apiserver/pkg/apis/audit" + "k8s.io/apiserver/pkg/audit" + ev "k8s.io/apiserver/pkg/audit/event" + "k8s.io/apiserver/pkg/audit/policy" +) + +// PluginName is the name reported in error metrics. +const PluginName = "enforced" + +// Backend filters audit events according to the policy +// trimming them as necessary to match the level +type Backend struct { + policyChecker policy.Checker + delegateBackend audit.Backend +} + +// NewBackend returns an enforced audit backend that wraps delegate backend. +// Enforced backend automatically runs and shuts down the delegate backend. +func NewBackend(delegate audit.Backend, p policy.Checker) audit.Backend { + return &Backend{ + policyChecker: p, + delegateBackend: delegate, + } +} + +// Run the delegate backend +func (b Backend) Run(stopCh <-chan struct{}) error { + return b.delegateBackend.Run(stopCh) +} + +// Shutdown the delegate backend +func (b Backend) Shutdown() { + b.delegateBackend.Shutdown() +} + +// ProcessEvents enforces policy on a shallow copy of the given event +// dropping any sections that don't conform +func (b Backend) ProcessEvents(events ...*auditinternal.Event) { + for _, event := range events { + if event == nil { + continue + } + attr, err := ev.NewAttributes(event) + if err != nil { + audit.HandlePluginError(PluginName, err, event) + continue + } + level, stages := b.policyChecker.LevelAndStages(attr) + if level == auditinternal.LevelNone { + continue + } + // make shallow copy before modifying to satisfy interface definition + ev := *event + e, err := policy.EnforcePolicy(&ev, level, stages) + if err != nil { + audit.HandlePluginError(PluginName, err, event) + continue + } + if e == nil { + continue + } + b.delegateBackend.ProcessEvents(e) + } +} + +// String returns a string representation of the backend +func (b Backend) String() string { + return fmt.Sprintf("%s<%s>", PluginName, b.delegateBackend) +} diff --git a/plugin/pkg/audit/dynamic/enforced/enforced_test.go b/plugin/pkg/audit/dynamic/enforced/enforced_test.go new file mode 100644 index 000000000..cbc61327a --- /dev/null +++ b/plugin/pkg/audit/dynamic/enforced/enforced_test.go @@ -0,0 +1,117 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package enforced + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "k8s.io/apimachinery/pkg/runtime" + auditinternal "k8s.io/apiserver/pkg/apis/audit" + "k8s.io/apiserver/pkg/audit/policy" + "k8s.io/apiserver/pkg/authentication/user" + "k8s.io/apiserver/pkg/authorization/authorizer" + fakeplugin "k8s.io/apiserver/plugin/pkg/audit/fake" +) + +func TestEnforced(t *testing.T) { + testCases := []struct { + name string + event *auditinternal.Event + policy auditinternal.Policy + attribs authorizer.Attributes + expected []*auditinternal.Event + }{ + { + name: "enforce level", + event: &auditinternal.Event{ + Level: auditinternal.LevelRequestResponse, + Stage: auditinternal.StageResponseComplete, + RequestURI: "/apis/extensions/v1beta1", + RequestObject: &runtime.Unknown{Raw: []byte(`test`)}, + ResponseObject: &runtime.Unknown{Raw: []byte(`test`)}, + }, + policy: auditinternal.Policy{ + Rules: []auditinternal.PolicyRule{ + { + Level: auditinternal.LevelMetadata, + }, + }, + }, + expected: []*auditinternal.Event{ + { + Level: auditinternal.LevelMetadata, + Stage: auditinternal.StageResponseComplete, + RequestURI: "/apis/extensions/v1beta1", + }, + }, + }, + { + name: "enforce policy rule", + event: &auditinternal.Event{ + Level: auditinternal.LevelRequestResponse, + Stage: auditinternal.StageResponseComplete, + RequestURI: "/apis/extensions/v1beta1", + User: auditinternal.UserInfo{ + Username: user.Anonymous, + }, + RequestObject: &runtime.Unknown{Raw: []byte(`test`)}, + ResponseObject: &runtime.Unknown{Raw: []byte(`test`)}, + }, + policy: auditinternal.Policy{ + Rules: []auditinternal.PolicyRule{ + { + Level: auditinternal.LevelNone, + Users: []string{user.Anonymous}, + }, + { + Level: auditinternal.LevelMetadata, + }, + }, + }, + expected: []*auditinternal.Event{}, + }, + { + name: "nil event", + event: nil, + policy: auditinternal.Policy{ + Rules: []auditinternal.PolicyRule{ + { + Level: auditinternal.LevelMetadata, + }, + }, + }, + expected: []*auditinternal.Event{}, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ev := []*auditinternal.Event{} + fakeBackend := fakeplugin.Backend{ + OnRequest: func(events []*auditinternal.Event) { + ev = events + }, + } + b := NewBackend(&fakeBackend, policy.NewChecker(&tc.policy)) + defer b.Shutdown() + + b.ProcessEvents(tc.event) + require.Equal(t, tc.expected, ev) + }) + } +} diff --git a/plugin/pkg/audit/dynamic/factory.go b/plugin/pkg/audit/dynamic/factory.go new file mode 100644 index 000000000..f9ce7abf7 --- /dev/null +++ b/plugin/pkg/audit/dynamic/factory.go @@ -0,0 +1,91 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dynamic + +import ( + "fmt" + "time" + + auditregv1alpha1 "k8s.io/api/auditregistration/v1alpha1" + "k8s.io/apiserver/pkg/audit" + "k8s.io/apiserver/pkg/audit/policy" + auditutil "k8s.io/apiserver/pkg/audit/util" + "k8s.io/apiserver/pkg/util/webhook" + bufferedplugin "k8s.io/apiserver/plugin/pkg/audit/buffered" + enforcedplugin "k8s.io/apiserver/plugin/pkg/audit/dynamic/enforced" + webhookplugin "k8s.io/apiserver/plugin/pkg/audit/webhook" +) + +// TODO: find a common place for all the default retry backoffs +const retryBackoff = 500 * time.Millisecond + +// factory builds a delegate from an AuditSink +type factory struct { + config *Config + webhookClientManager webhook.ClientManager + sink *auditregv1alpha1.AuditSink +} + +// BuildDelegate creates a delegate from the AuditSink object +func (f *factory) BuildDelegate() (*delegate, error) { + backend, err := f.buildWebhookBackend() + if err != nil { + return nil, err + } + backend = f.applyEnforcedOpts(backend) + backend = f.applyBufferedOpts(backend) + ch := make(chan struct{}) + return &delegate{ + Backend: backend, + configuration: f.sink, + stopChan: ch, + }, nil +} + +func (f *factory) buildWebhookBackend() (audit.Backend, error) { + hookClient := auditutil.HookClientConfigForSink(f.sink) + client, err := f.webhookClientManager.HookClient(hookClient) + if err != nil { + return nil, fmt.Errorf("could not create webhook client: %v", err) + } + backend := webhookplugin.NewDynamicBackend(client, retryBackoff) + return backend, nil +} + +func (f *factory) applyEnforcedOpts(delegate audit.Backend) audit.Backend { + pol := policy.ConvertDynamicPolicyToInternal(&f.sink.Spec.Policy) + checker := policy.NewChecker(pol) + eb := enforcedplugin.NewBackend(delegate, checker) + return eb +} + +func (f *factory) applyBufferedOpts(delegate audit.Backend) audit.Backend { + bc := f.config.BufferedConfig + tc := f.sink.Spec.Webhook.Throttle + if tc != nil { + bc.ThrottleEnable = true + if tc.Burst != nil { + bc.ThrottleBurst = int(*tc.Burst) + } + if tc.QPS != nil { + bc.ThrottleQPS = float32(*tc.QPS) + } + } else { + bc.ThrottleEnable = false + } + return bufferedplugin.NewBackend(delegate, *bc) +} diff --git a/plugin/pkg/audit/dynamic/factory_test.go b/plugin/pkg/audit/dynamic/factory_test.go new file mode 100644 index 000000000..3caa2c3a5 --- /dev/null +++ b/plugin/pkg/audit/dynamic/factory_test.go @@ -0,0 +1,146 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dynamic + +import ( + "testing" + + "github.com/stretchr/testify/require" + + auditregv1alpha1 "k8s.io/api/auditregistration/v1alpha1" + utilpointer "k8s.io/utils/pointer" +) + +func TestToDelegate(t *testing.T) { + config, _ := defaultTestConfig() + defaultPolicy := auditregv1alpha1.Policy{ + Level: auditregv1alpha1.LevelMetadata, + } + u := "http://localhost:4444" + for _, tc := range []struct { + name string + auditConfig *auditregv1alpha1.AuditSink + throttleConfig *auditregv1alpha1.WebhookThrottleConfig + expectedBackend string + }{ + { + name: "build full", + auditConfig: &auditregv1alpha1.AuditSink{ + Spec: auditregv1alpha1.AuditSinkSpec{ + Policy: defaultPolicy, + Webhook: auditregv1alpha1.Webhook{ + Throttle: &auditregv1alpha1.WebhookThrottleConfig{ + QPS: utilpointer.Int64Ptr(10), + Burst: utilpointer.Int64Ptr(5), + }, + ClientConfig: auditregv1alpha1.WebhookClientConfig{ + URL: &u, + }, + }, + }, + }, + expectedBackend: "buffered>", + }, + { + name: "build no throttle", + auditConfig: &auditregv1alpha1.AuditSink{ + Spec: auditregv1alpha1.AuditSinkSpec{ + Policy: defaultPolicy, + Webhook: auditregv1alpha1.Webhook{ + ClientConfig: auditregv1alpha1.WebhookClientConfig{ + URL: &u, + }, + }, + }, + }, + expectedBackend: "buffered>", + }, + } { + t.Run(tc.name, func(t *testing.T) { + b, err := NewBackend(config) + require.NoError(t, err) + c := factory{ + config: b.(*backend).config, + webhookClientManager: b.(*backend).webhookClientManager, + sink: tc.auditConfig, + } + d, err := c.BuildDelegate() + require.NoError(t, err) + require.Equal(t, tc.expectedBackend, d.String()) + }) + } +} + +func TestBuildWebhookBackend(t *testing.T) { + defaultPolicy := auditregv1alpha1.Policy{ + Level: auditregv1alpha1.LevelMetadata, + } + config, _ := defaultTestConfig() + b, err := NewBackend(config) + require.NoError(t, err) + d := b.(*backend) + u := "http://localhost:4444" + for _, tc := range []struct { + name string + auditConfig *auditregv1alpha1.AuditSink + shouldErr bool + expectedBackend string + }{ + { + name: "build full", + auditConfig: &auditregv1alpha1.AuditSink{ + Spec: auditregv1alpha1.AuditSinkSpec{ + Policy: defaultPolicy, + Webhook: auditregv1alpha1.Webhook{ + ClientConfig: auditregv1alpha1.WebhookClientConfig{ + URL: &u, + }, + }, + }, + }, + expectedBackend: "dynamic_webhook", + shouldErr: false, + }, + { + name: "fail missing url", + auditConfig: &auditregv1alpha1.AuditSink{ + Spec: auditregv1alpha1.AuditSinkSpec{ + Policy: defaultPolicy, + Webhook: auditregv1alpha1.Webhook{ + ClientConfig: auditregv1alpha1.WebhookClientConfig{}, + }, + }, + }, + shouldErr: true, + }, + } { + t.Run(tc.name, func(t *testing.T) { + c := &factory{ + config: config, + webhookClientManager: d.webhookClientManager, + sink: tc.auditConfig, + } + ab, err := c.buildWebhookBackend() + if tc.shouldErr { + require.Error(t, err) + return + } + require.NoError(t, err) + require.Equal(t, tc.expectedBackend, ab.String()) + }) + } +} diff --git a/plugin/pkg/audit/webhook/webhook.go b/plugin/pkg/audit/webhook/webhook.go index 80b4842fd..5f5f7169d 100644 --- a/plugin/pkg/audit/webhook/webhook.go +++ b/plugin/pkg/audit/webhook/webhook.go @@ -18,6 +18,7 @@ limitations under the License. package webhook import ( + "fmt" "time" "k8s.io/apimachinery/pkg/runtime/schema" @@ -47,7 +48,20 @@ func loadWebhook(configFile string, groupVersion schema.GroupVersion, initialBac } type backend struct { - w *webhook.GenericWebhook + w *webhook.GenericWebhook + name string +} + +// NewDynamicBackend returns an audit backend configured from a REST client that +// sends events over HTTP to an external service. +func NewDynamicBackend(rc *rest.RESTClient, initialBackoff time.Duration) audit.Backend { + return &backend{ + w: &webhook.GenericWebhook{ + RestClient: rc, + InitialBackoff: initialBackoff, + }, + name: fmt.Sprintf("dynamic_%s", PluginName), + } } // NewBackend returns an audit backend that sends events over HTTP to an external service. @@ -56,7 +70,7 @@ func NewBackend(kubeConfigFile string, groupVersion schema.GroupVersion, initial if err != nil { return nil, err } - return &backend{w}, nil + return &backend{w: w, name: PluginName}, nil } func (b *backend) Run(stopCh <-chan struct{}) error { @@ -69,7 +83,7 @@ func (b *backend) Shutdown() { func (b *backend) ProcessEvents(ev ...*auditinternal.Event) { if err := b.processEvents(ev...); err != nil { - audit.HandlePluginError(PluginName, err, ev...) + audit.HandlePluginError(b.String(), err, ev...) } } @@ -84,5 +98,5 @@ func (b *backend) processEvents(ev ...*auditinternal.Event) error { } func (b *backend) String() string { - return PluginName + return b.name }