diff --git a/pkg/apis/audit/helpers.go b/pkg/apis/audit/helpers.go new file mode 100644 index 000000000..3f54fc1e8 --- /dev/null +++ b/pkg/apis/audit/helpers.go @@ -0,0 +1,46 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package audit + +func ordLevel(l Level) int { + switch l { + case LevelMetadata: + return 1 + case LevelRequest: + return 2 + case LevelRequestResponse: + return 3 + default: + return 0 + } +} + +func (a Level) Less(b Level) bool { + return ordLevel(a) < ordLevel(b) +} + +func (a Level) GreaterOrEqual(b Level) bool { + return ordLevel(a) >= ordLevel(b) +} + +func NewConstantPolicy(level Level) *Policy { + return &Policy{ + Rules: []PolicyRule{ + {Level: level}, + }, + } +} diff --git a/pkg/audit/request.go b/pkg/audit/request.go new file mode 100644 index 000000000..36d82f204 --- /dev/null +++ b/pkg/audit/request.go @@ -0,0 +1,202 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package audit + +import ( + "bytes" + "fmt" + "net/http" + "strings" + "time" + + "github.com/golang/glog" + "github.com/pborman/uuid" + + "reflect" + + "k8s.io/apimachinery/pkg/apis/meta/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + utilnet "k8s.io/apimachinery/pkg/util/net" + "k8s.io/apiserver/pkg/apis/audit" + auditinternal "k8s.io/apiserver/pkg/apis/audit" + "k8s.io/apiserver/pkg/authorization/authorizer" + authenticationv1 "k8s.io/client-go/pkg/apis/authentication/v1" +) + +const ( + AuditIDHeader = "X-Request-ID" +) + +func NewEventFromRequest(req *http.Request, policy *auditinternal.Policy, attribs authorizer.Attributes) (*auditinternal.Event, error) { + ev := &auditinternal.Event{ + Timestamp: metav1.NewTime(time.Now()), + Verb: attribs.GetVerb(), + RequestURI: req.URL.RequestURI(), + } + + // set the level + ev.Level = auditinternal.LevelNone + if policy != nil && len(policy.Rules) > 0 { + // This is just a hack to get through the test without setting a high level by default. + // TODO(audit): add the policy evalutation here + ev.Level = policy.Rules[0].Level + } + + // prefer the id from the headers. If not available, create a new one. + // TODO(audit): do we want to forbid the header for non-front-proxy users? + ids := req.Header[AuditIDHeader] + if len(ids) > 0 { + ev.AuditID = types.UID(ids[0]) + } else { + ev.AuditID = types.UID(uuid.NewRandom().String()) + } + + ips := utilnet.SourceIPs(req) + ev.SourceIPs = make([]string, len(ips)) + for i := range ips { + ev.SourceIPs[i] = ips[i].String() + } + + if user := attribs.GetUser(); user != nil { + ev.User.Username = user.GetName() + ev.User.Extra = map[string]auditinternal.ExtraValue{} + for k, v := range user.GetExtra() { + ev.User.Extra[k] = auditinternal.ExtraValue(v) + } + ev.User.Groups = user.GetGroups() + ev.User.UID = user.GetUID() + } + + if asuser := req.Header.Get(authenticationv1.ImpersonateUserHeader); len(asuser) > 0 { + ev.ImpersonatedUser = &auditinternal.UserInfo{ + Username: asuser, + } + if requestedGroups := req.Header[authenticationv1.ImpersonateGroupHeader]; len(requestedGroups) > 0 { + ev.ImpersonatedUser.Groups = requestedGroups + } + + ev.ImpersonatedUser.Extra = map[string]auditinternal.ExtraValue{} + for k, v := range req.Header { + if !strings.HasPrefix(k, authenticationv1.ImpersonateUserExtraHeaderPrefix) { + continue + } + k = k[len(authenticationv1.ImpersonateUserExtraHeaderPrefix):] + ev.ImpersonatedUser.Extra[k] = auditinternal.ExtraValue(v) + } + } + + if attribs.IsResourceRequest() { + ev.ObjectRef = &auditinternal.ObjectReference{ + Namespace: attribs.GetNamespace(), + Name: attribs.GetName(), + Resource: attribs.GetResource(), + APIVersion: attribs.GetAPIGroup() + "/" + attribs.GetAPIVersion(), + } + } + + return ev, nil +} + +// LogRequestObject fills in the request object into an audit event. The passed runtime.Object +// will be converted to the given gv. +func LogRequestObject(ae *audit.Event, obj runtime.Object, gv schema.GroupVersion, s runtime.NegotiatedSerializer) { + if ae == nil || ae.Level.Less(audit.LevelRequest) { + return + } + + // TODO(audit): hook into the serializer to avoid double conversion + var err error + ae.RequestObject, err = encodeObject(obj, gv, s) + if err != nil { + // TODO(audit): add error slice to audit event struct + glog.Warningf("Auditing failed of %v request: %v", reflect.TypeOf(obj).Name(), err) + return + } + + // complete ObjectRef + if ae.ObjectRef == nil { + ae.ObjectRef = &audit.ObjectReference{} + } + if acc, ok := obj.(v1.ObjectMetaAccessor); ok { + meta := acc.GetObjectMeta() + if len(ae.ObjectRef.Namespace) == 0 { + ae.ObjectRef.Namespace = meta.GetNamespace() + } + if len(ae.ObjectRef.Name) == 0 { + ae.ObjectRef.Name = meta.GetName() + } + if len(ae.ObjectRef.UID) == 0 { + ae.ObjectRef.UID = meta.GetUID() + } + if len(ae.ObjectRef.ResourceVersion) == 0 { + ae.ObjectRef.ResourceVersion = meta.GetResourceVersion() + } + } +} + +// LogRquestPatch fills in the given patch as the request object into an audit event. +func LogRequestPatch(ae *audit.Event, patch []byte) { + if ae == nil || ae.Level.Less(audit.LevelRequest) { + return + } + + ae.RequestObject = runtime.Unknown{ + Raw: patch, + ContentType: runtime.ContentTypeJSON, + } +} + +// LogResponseObject fills in the response object into an audit event. The passed runtime.Object +// will be converted to the given gv. +func LogResponseObject(ae *audit.Event, obj runtime.Object, gv schema.GroupVersion, s runtime.NegotiatedSerializer) { + if ae == nil || ae.Level.Less(audit.LevelRequestResponse) { + return + } + + if status, ok := obj.(*metav1.Status); ok { + ae.ResponseStatus = status + } + + // TODO(audit): hook into the serializer to avoid double conversion + var err error + ae.ResponseObject, err = encodeObject(obj, gv, s) + if err != nil { + glog.Warningf("Audit failed for %q response: %v", reflect.TypeOf(obj).Name(), err) + } +} + +func encodeObject(obj runtime.Object, gv schema.GroupVersion, serializer runtime.NegotiatedSerializer) (runtime.Unknown, error) { + supported := serializer.SupportedMediaTypes() + for i := range supported { + if supported[i].MediaType == "application/json" { + enc := serializer.EncoderForVersion(supported[i].Serializer, gv) + var buf bytes.Buffer + if err := enc.Encode(obj, &buf); err != nil { + return runtime.Unknown{}, fmt.Errorf("encoding failed: %v", err) + } + + return runtime.Unknown{ + Raw: buf.Bytes(), + ContentType: runtime.ContentTypeJSON, + }, nil + } + } + return runtime.Unknown{}, fmt.Errorf("no json encoder found") +} diff --git a/pkg/audit/types.go b/pkg/audit/types.go new file mode 100644 index 000000000..a7c10cf03 --- /dev/null +++ b/pkg/audit/types.go @@ -0,0 +1,37 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package audit + +import ( + auditinternal "k8s.io/apiserver/pkg/apis/audit" +) + +type Sink interface { + // ProcessEvents handles events. Per audit ID it might be that ProcessEvents is called up to three times. + // Errors might be logged by the sink itself. If an error should be fatal, leading to an internal + // error, ProcessEvents is supposed to panic. The event must not be mutated and is reused by the caller + // after the call returns, i.e. the sink has to make a deepcopy to keep a copy around if necessary. + ProcessEvents(events ...*auditinternal.Event) +} + +type Backend interface { + Sink + + // Run will initialize the backend. It must not block, but may run go routines in the background. If + // stopCh is closed, it is supposed to stop them. Run will be called before the first call to ProcessEvents. + Run(stopCh <-chan struct{}) error +} diff --git a/pkg/endpoints/apiserver_test.go b/pkg/endpoints/apiserver_test.go index 26ec48146..a37f5f03b 100644 --- a/pkg/endpoints/apiserver_test.go +++ b/pkg/endpoints/apiserver_test.go @@ -52,9 +52,11 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/admission" + auditinternal "k8s.io/apiserver/pkg/apis/audit" "k8s.io/apiserver/pkg/apis/example" examplefuzzer "k8s.io/apiserver/pkg/apis/example/fuzzer" examplev1 "k8s.io/apiserver/pkg/apis/example/v1" + "k8s.io/apiserver/pkg/audit" genericapifilters "k8s.io/apiserver/pkg/endpoints/filters" "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" "k8s.io/apiserver/pkg/endpoints/request" @@ -256,25 +258,25 @@ type defaultAPIServer struct { // uses the default settings func handle(storage map[string]rest.Storage) http.Handler { - return handleInternal(storage, admissionControl, selfLinker) + return handleInternal(storage, admissionControl, selfLinker, nil) } // tests with a deny admission controller func handleDeny(storage map[string]rest.Storage) http.Handler { - return handleInternal(storage, alwaysDeny{}, selfLinker) + return handleInternal(storage, alwaysDeny{}, selfLinker, nil) } // tests using the new namespace scope mechanism func handleNamespaced(storage map[string]rest.Storage) http.Handler { - return handleInternal(storage, admissionControl, selfLinker) + return handleInternal(storage, admissionControl, selfLinker, nil) } // tests using a custom self linker func handleLinker(storage map[string]rest.Storage, selfLinker runtime.SelfLinker) http.Handler { - return handleInternal(storage, admissionControl, selfLinker) + return handleInternal(storage, admissionControl, selfLinker, nil) } -func handleInternal(storage map[string]rest.Storage, admissionControl admission.Interface, selfLinker runtime.SelfLinker) http.Handler { +func handleInternal(storage map[string]rest.Storage, admissionControl admission.Interface, selfLinker runtime.SelfLinker, auditSink audit.Sink) http.Handler { container := restful.NewContainer() container.Router(restful.CurlyRouter{}) mux := container.ServeMux @@ -332,7 +334,11 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission. } } - handler := genericapifilters.WithRequestInfo(mux, testRequestInfoResolver(), requestContextMapper) + handler := genericapifilters.WithAudit(mux, requestContextMapper, auditSink, auditinternal.NewConstantPolicy(auditinternal.LevelRequestResponse), func(r *http.Request, requestInfo *request.RequestInfo) bool { + // simplified long-running check + return requestInfo.Verb == "watch" || requestInfo.Verb == "proxy" + }) + handler = genericapifilters.WithRequestInfo(handler, testRequestInfoResolver(), requestContextMapper) handler = request.WithRequestContext(handler, requestContextMapper) return &defaultAPIServer{handler, container} @@ -1088,7 +1094,7 @@ func TestList(t *testing.T) { namespace: testCase.namespace, expectedSet: testCase.selfLink, } - var handler = handleInternal(storage, admissionControl, selfLinker) + var handler = handleInternal(storage, admissionControl, selfLinker, nil) server := httptest.NewServer(handler) defer server.Close() @@ -1768,7 +1774,7 @@ func TestGetNamespaceSelfLink(t *testing.T) { namespace: "foo", } storage["simple"] = &simpleStorage - handler := handleInternal(storage, admissionControl, selfLinker) + handler := handleInternal(storage, admissionControl, selfLinker, nil) server := httptest.NewServer(handler) defer server.Close() @@ -3166,7 +3172,7 @@ func TestCreateInvokesAdmissionControl(t *testing.T) { namespace: "other", expectedSet: "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/other/foo/bar", } - handler := handleInternal(map[string]rest.Storage{"foo": &storage}, alwaysDeny{}, selfLinker) + handler := handleInternal(map[string]rest.Storage{"foo": &storage}, alwaysDeny{}, selfLinker, nil) server := httptest.NewServer(handler) defer server.Close() client := http.Client{} @@ -3248,7 +3254,7 @@ func (obj *UnregisteredAPIObject) GetObjectKind() schema.ObjectKind { func TestWriteJSONDecodeError(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - responsewriters.WriteObjectNegotiated(codecs, newGroupVersion, w, req, http.StatusOK, &UnregisteredAPIObject{"Undecodable"}) + responsewriters.WriteObjectNegotiated(nil, codecs, newGroupVersion, w, req, http.StatusOK, &UnregisteredAPIObject{"Undecodable"}) })) defer server.Close() // We send a 200 status code before we encode the object, so we expect OK, but there will diff --git a/pkg/endpoints/audit_test.go b/pkg/endpoints/audit_test.go new file mode 100644 index 000000000..c6a7a840f --- /dev/null +++ b/pkg/endpoints/audit_test.go @@ -0,0 +1,326 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package endpoints + +import ( + "bytes" + "fmt" + "net/http" + "net/http/httptest" + "regexp" + "sync" + "testing" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + auditinternal "k8s.io/apiserver/pkg/apis/audit" + genericapitesting "k8s.io/apiserver/pkg/endpoints/testing" + "k8s.io/apiserver/pkg/registry/rest" +) + +type fakeAuditSink struct { + lock sync.Mutex + events []*auditinternal.Event +} + +func (s *fakeAuditSink) ProcessEvents(evs ...*auditinternal.Event) { + s.lock.Lock() + defer s.lock.Unlock() + s.events = append(s.events, evs...) +} + +func (s *fakeAuditSink) Events() []*auditinternal.Event { + s.lock.Lock() + defer s.lock.Unlock() + return append([]*auditinternal.Event{}, s.events...) +} + +func TestAudit(t *testing.T) { + type eventCheck func(events []*auditinternal.Event) error + + // fixtures + simpleFoo := &genericapitesting.Simple{Other: "foo"} + simpleFooJSON, _ := runtime.Encode(testCodec, simpleFoo) + + simpleCPrime := &genericapitesting.Simple{ + ObjectMeta: metav1.ObjectMeta{Name: "c", Namespace: "other"}, + Other: "bla", + } + simpleCPrimeJSON, _ := runtime.Encode(testCodec, simpleCPrime) + + // event checks + requestBodyIs := func(i int, text string) eventCheck { + return func(events []*auditinternal.Event) error { + if string(events[i].RequestObject.Raw) != text { + return fmt.Errorf("expected RequestBody %q, got %q", text, string(events[i].RequestObject.Raw)) + } + return nil + } + } + requestBodyMatches := func(i int, pattern string) eventCheck { + return func(events []*auditinternal.Event) error { + if matched, _ := regexp.Match(pattern, events[i].RequestObject.Raw); !matched { + return fmt.Errorf("expected RequestBody to match %q, but didn't: %q", pattern, string(events[i].RequestObject.Raw)) + } + return nil + } + } + responseBodyIs := func(i int, text string) eventCheck { + return func(events []*auditinternal.Event) error { + if string(events[i].ResponseObject.Raw) != text { + return fmt.Errorf("expected ResponseBody %q, got %q", text, string(events[i].ResponseObject.Raw)) + } + return nil + } + } + responseBodyMatches := func(i int, pattern string) eventCheck { + return func(events []*auditinternal.Event) error { + if matched, _ := regexp.Match(pattern, events[i].ResponseObject.Raw); !matched { + return fmt.Errorf("expected ResponseBody to match %q, but didn't: %q", pattern, string(events[i].ResponseObject.Raw)) + } + return nil + } + } + + for _, test := range []struct { + desc string + req func(server string) (*http.Request, error) + linker runtime.SelfLinker + code int + events int + checks []eventCheck + }{ + { + "get", + func(server string) (*http.Request, error) { + return http.NewRequest("GET", server+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/other/simple/c", bytes.NewBuffer(simpleFooJSON)) + }, + selfLinker, + 200, + 1, + []eventCheck{ + requestBodyIs(0, ""), + responseBodyMatches(0, `{.*"name":"c".*}`), + }, + }, + { + "list", + func(server string) (*http.Request, error) { + return http.NewRequest("GET", server+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/other/simple?labelSelector=a%3Dfoobar", nil) + }, + &setTestSelfLinker{ + t: t, + expectedSet: "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/other/simple", + namespace: "other", + }, + 200, + 1, + []eventCheck{ + requestBodyMatches(0, ""), + responseBodyMatches(0, `{.*"name":"a".*"name":"b".*}`), + }, + }, + { + "create", + func(server string) (*http.Request, error) { + return http.NewRequest("POST", server+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/default/simple", bytes.NewBuffer(simpleFooJSON)) + }, + selfLinker, + 201, + 1, + []eventCheck{ + requestBodyIs(0, string(simpleFooJSON)), + responseBodyMatches(0, `{.*"foo".*}`), + }, + }, + { + "not-allowed-named-create", + func(server string) (*http.Request, error) { + return http.NewRequest("POST", server+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/default/simple/named", bytes.NewBuffer(simpleFooJSON)) + }, + selfLinker, + 405, + 1, + []eventCheck{ + requestBodyIs(0, ""), // the 405 is thrown long before the create handler would be executed + responseBodyIs(0, ""), // the 405 is thrown long before the create handler would be executed + }, + }, + { + "delete", + func(server string) (*http.Request, error) { + return http.NewRequest("DELETE", server+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/default/simple/a", nil) + }, + selfLinker, + 200, + 1, + []eventCheck{ + requestBodyMatches(0, ""), + responseBodyMatches(0, ""), + }, + }, + { + "delete-with-options-in-body", + func(server string) (*http.Request, error) { + return http.NewRequest("DELETE", server+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/default/simple/a", bytes.NewBuffer([]byte(`{"kind":"DeleteOptions"}`))) + }, + selfLinker, + 200, + 1, + []eventCheck{ + requestBodyMatches(0, "DeleteOptions"), + responseBodyMatches(0, ""), + }, + }, + { + "update", + func(server string) (*http.Request, error) { + return http.NewRequest("PUT", server+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/other/simple/c", bytes.NewBuffer(simpleCPrimeJSON)) + }, + selfLinker, + 200, + 1, + []eventCheck{ + requestBodyIs(0, string(simpleCPrimeJSON)), + responseBodyMatches(0, `{.*"bla".*}`), + }, + }, + { + "update-wrong-namespace", + func(server string) (*http.Request, error) { + return http.NewRequest("PUT", server+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/default/simple/c", bytes.NewBuffer(simpleCPrimeJSON)) + }, + selfLinker, + 400, + 1, + []eventCheck{ + requestBodyIs(0, string(simpleCPrimeJSON)), + responseBodyMatches(0, `"Status".*"status":"Failure".*"code":400}`), + }, + }, + { + "patch", + func(server string) (*http.Request, error) { + req, _ := http.NewRequest("PATCH", server+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/other/simple/c", bytes.NewReader([]byte(`{"labels":{"foo":"bar"}}`))) + req.Header.Set("Content-Type", "application/merge-patch+json; charset=UTF-8") + return req, nil + }, + &setTestSelfLinker{ + t: t, + expectedSet: "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/other/simple/c", + name: "c", + namespace: "other", + }, + 200, + 1, + []eventCheck{ + requestBodyIs(0, `{"labels":{"foo":"bar"}}`), + responseBodyMatches(0, `"name":"c".*"labels":{"foo":"bar"}`), + }, + }, + { + "watch", + func(server string) (*http.Request, error) { + return http.NewRequest("GET", server+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/other/simple?watch=true", nil) + }, + &setTestSelfLinker{ + t: t, + expectedSet: "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/other/simple", + namespace: "other", + }, + 200, + 2, + []eventCheck{ + requestBodyMatches(0, ""), + responseBodyMatches(0, ""), + }, + }, + } { + sink := &fakeAuditSink{} + handler := handleInternal(map[string]rest.Storage{ + "simple": &SimpleRESTStorage{ + list: []genericapitesting.Simple{ + { + ObjectMeta: metav1.ObjectMeta{Name: "a", Namespace: "other"}, + Other: "foo", + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "b", Namespace: "other"}, + Other: "foo", + }, + }, + item: genericapitesting.Simple{ + ObjectMeta: metav1.ObjectMeta{Name: "c", Namespace: "other", UID: "uid"}, + Other: "foo", + }, + }, + }, admissionControl, selfLinker, sink) + + server := httptest.NewServer(handler) + defer server.Close() + client := http.Client{Timeout: 2 * time.Second} + + req, err := test.req(server.URL) + if err != nil { + t.Errorf("[%s] error creating the request: %v", test.desc, err) + } + + response, err := client.Do(req) + if err != nil { + t.Errorf("[%s] error: %v", test.desc, err) + } + + if response.StatusCode != test.code { + t.Errorf("[%s] expected http code %d, got %#v", test.desc, test.code, response) + } + + // close body because the handler might block in Flush, unable to send the remaining event. + response.Body.Close() + + // wait for events to arrive, at least the given number in the test + events := []*auditinternal.Event{} + err = wait.Poll(50*time.Millisecond, wait.ForeverTestTimeout, wait.ConditionFunc(func() (done bool, err error) { + events = sink.Events() + return len(events) >= test.events, nil + })) + if err != nil { + t.Errorf("[%s] timeout waiting for events", test.desc) + } + + if got := len(events); got != test.events { + t.Errorf("[%s] expected %d audit events, got %d", test.desc, test.events, got) + } else { + for i, check := range test.checks { + err := check(events) + if err != nil { + t.Errorf("[%s,%d] %v", test.desc, i, err) + } + } + } + + if len(events) > 0 { + status := events[len(events)-1].ResponseStatus + if status == nil { + t.Errorf("[%s] expected non-nil ResponseStatus in last event", test.desc) + } else if int(status.Code) != test.code { + t.Errorf("[%s] expected ResponseStatus.Code=%d, got %d", test.desc, test.code, status.Code) + } + } + } +} diff --git a/pkg/endpoints/discovery/group.go b/pkg/endpoints/discovery/group.go index fd9798276..6ceda0131 100644 --- a/pkg/endpoints/discovery/group.go +++ b/pkg/endpoints/discovery/group.go @@ -70,5 +70,5 @@ func (s *APIGroupHandler) handle(req *restful.Request, resp *restful.Response) { } func (s *APIGroupHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - responsewriters.WriteObjectNegotiated(s.serializer, schema.GroupVersion{}, w, req, http.StatusOK, &s.group) + responsewriters.WriteObjectNegotiated(nil, s.serializer, schema.GroupVersion{}, w, req, http.StatusOK, &s.group) } diff --git a/pkg/endpoints/discovery/legacy.go b/pkg/endpoints/discovery/legacy.go index fb648e528..65747cd04 100644 --- a/pkg/endpoints/discovery/legacy.go +++ b/pkg/endpoints/discovery/legacy.go @@ -74,5 +74,5 @@ func (s *legacyRootAPIHandler) handle(req *restful.Request, resp *restful.Respon Versions: s.apiVersions, } - responsewriters.WriteObjectNegotiated(s.serializer, schema.GroupVersion{}, resp.ResponseWriter, req.Request, http.StatusOK, apiVersions) + responsewriters.WriteObjectNegotiated(nil, s.serializer, schema.GroupVersion{}, resp.ResponseWriter, req.Request, http.StatusOK, apiVersions) } diff --git a/pkg/endpoints/discovery/root.go b/pkg/endpoints/discovery/root.go index 0b798eafc..a436b3b36 100644 --- a/pkg/endpoints/discovery/root.go +++ b/pkg/endpoints/discovery/root.go @@ -111,7 +111,7 @@ func (s *rootAPIsHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) groups[i].ServerAddressByClientCIDRs = serverCIDR } - responsewriters.WriteObjectNegotiated(s.serializer, schema.GroupVersion{}, resp, req, http.StatusOK, &metav1.APIGroupList{Groups: groups}) + responsewriters.WriteObjectNegotiated(nil, s.serializer, schema.GroupVersion{}, resp, req, http.StatusOK, &metav1.APIGroupList{Groups: groups}) } func (s *rootAPIsHandler) restfulHandle(req *restful.Request, resp *restful.Response) { diff --git a/pkg/endpoints/discovery/version.go b/pkg/endpoints/discovery/version.go index 93f95034a..eac3f4f86 100644 --- a/pkg/endpoints/discovery/version.go +++ b/pkg/endpoints/discovery/version.go @@ -78,6 +78,6 @@ func (s *APIVersionHandler) handle(req *restful.Request, resp *restful.Response) } func (s *APIVersionHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - responsewriters.WriteObjectNegotiated(s.serializer, schema.GroupVersion{}, w, req, http.StatusOK, + responsewriters.WriteObjectNegotiated(nil, s.serializer, schema.GroupVersion{}, w, req, http.StatusOK, &metav1.APIResourceList{GroupVersion: s.groupVersion.String(), APIResources: s.apiResourceLister.ListAPIResources()}) } diff --git a/pkg/endpoints/filters/audit.go b/pkg/endpoints/filters/audit.go index 9aefc9676..b64b40341 100644 --- a/pkg/endpoints/filters/audit.go +++ b/pkg/endpoints/filters/audit.go @@ -19,36 +19,154 @@ package filters import ( "bufio" "errors" - "fmt" - "io" "net" "net/http" - "strings" - "time" + "sync" - "github.com/golang/glog" - "github.com/pborman/uuid" + "fmt" - utilnet "k8s.io/apimachinery/pkg/util/net" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + auditinternal "k8s.io/apiserver/pkg/apis/audit" + "k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" "k8s.io/apiserver/pkg/endpoints/request" - authenticationapi "k8s.io/client-go/pkg/apis/authentication/v1" ) +// WithAudit decorates a http.Handler with audit logging information for all the +// requests coming to the server. If out is nil, no decoration takes place. +// Each audit log contains two entries: +// 1. the request line containing: +// - unique id allowing to match the response line (see 2) +// - source ip of the request +// - HTTP method being invoked +// - original user invoking the operation +// - original user's groups info +// - impersonated user for the operation +// - impersonated groups info +// - namespace of the request or +// - uri is the full URI as requested +// 2. the response line containing: +// - the unique id from 1 +// - response code +func WithAudit(handler http.Handler, requestContextMapper request.RequestContextMapper, sink audit.Sink, policy *auditinternal.Policy, longRunningCheck request.LongRunningRequestCheck) http.Handler { + if sink == nil { + return handler + } + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + ctx, ok := requestContextMapper.Get(req) + if !ok { + responsewriters.InternalError(w, req, errors.New("no context found for request")) + return + } + + attribs, err := GetAuthorizerAttributes(ctx) + if err != nil { + utilruntime.HandleError(fmt.Errorf("failed to GetAuthorizerAttributes: %v", err)) + responsewriters.InternalError(w, req, errors.New("failed to parse request")) + return + } + + ev, err := audit.NewEventFromRequest(req, policy, attribs) + if err != nil { + utilruntime.HandleError(fmt.Errorf("failed to complete audit event from request: %v", err)) + responsewriters.InternalError(w, req, errors.New("failed to update context")) + return + } + + ctx = request.WithAuditEvent(ctx, ev) + if err := requestContextMapper.Update(req, ctx); err != nil { + utilruntime.HandleError(fmt.Errorf("failed to attach audit event to the context: %v", err)) + responsewriters.InternalError(w, req, errors.New("failed to update context")) + return + } + + // intercept the status code + longRunning := false + var longRunningSink audit.Sink + if longRunningCheck != nil { + ri, _ := request.RequestInfoFrom(ctx) + if longRunning = longRunningCheck(req, ri); longRunning { + longRunningSink = sink + } + } + respWriter := decorateResponseWriter(w, ev, longRunningSink) + + // send audit event when we leave this func, either via a panic or cleanly. In the case of long + // running requests, this will be the second audit event. + defer func() { + if r := recover(); r != nil { + ev.ResponseStatus = &metav1.Status{ + Code: http.StatusInternalServerError, + } + sink.ProcessEvents(ev) + panic(r) + } + + if ev.ResponseStatus == nil { + ev.ResponseStatus = &metav1.Status{ + Code: 200, + } + } + + sink.ProcessEvents(ev) + }() + handler.ServeHTTP(respWriter, req) + }) +} + +func decorateResponseWriter(responseWriter http.ResponseWriter, ev *auditinternal.Event, sink audit.Sink) http.ResponseWriter { + delegate := &auditResponseWriter{ + ResponseWriter: responseWriter, + event: ev, + sink: sink, + } + + // check if the ResponseWriter we're wrapping is the fancy one we need + // or if the basic is sufficient + _, cn := responseWriter.(http.CloseNotifier) + _, fl := responseWriter.(http.Flusher) + _, hj := responseWriter.(http.Hijacker) + if cn && fl && hj { + return &fancyResponseWriterDelegator{delegate} + } + return delegate +} + var _ http.ResponseWriter = &auditResponseWriter{} +// auditResponseWriter intercepts WriteHeader, sets it in the event. If the sink is set, it will +// create immediately an event (for long running requests). type auditResponseWriter struct { http.ResponseWriter - out io.Writer - id string + event *auditinternal.Event + once sync.Once + sink audit.Sink +} + +func (a *auditResponseWriter) processCode(code int) { + a.once.Do(func() { + if a.sink != nil { + a.sink.ProcessEvents(a.event) + } + + // for now we use the ResponseStatus as marker that it's the first or second event + // of a long running request. As soon as we have such a field in the event, we can + // change this. + if a.event.ResponseStatus == nil { + a.event.ResponseStatus = &metav1.Status{} + } + a.event.ResponseStatus.Code = int32(code) + }) +} + +func (a *auditResponseWriter) Write(bs []byte) (int, error) { + a.processCode(200) // the Go library calls WriteHeader internally if no code was written yet. But this will go unnoticed for us + return a.ResponseWriter.Write(bs) } func (a *auditResponseWriter) WriteHeader(code int) { - line := fmt.Sprintf("%s AUDIT: id=%q response=\"%d\"\n", time.Now().Format(time.RFC3339Nano), a.id, code) - if _, err := fmt.Fprint(a.out, line); err != nil { - glog.Errorf("Unable to write audit log: %s, the error is: %v", line, err) - } - + a.processCode(code) a.ResponseWriter.WriteHeader(code) } @@ -74,89 +192,3 @@ func (f *fancyResponseWriterDelegator) Hijack() (net.Conn, *bufio.ReadWriter, er var _ http.CloseNotifier = &fancyResponseWriterDelegator{} var _ http.Flusher = &fancyResponseWriterDelegator{} var _ http.Hijacker = &fancyResponseWriterDelegator{} - -// WithAudit decorates a http.Handler with audit logging information for all the -// requests coming to the server. If out is nil, no decoration takes place. -// Each audit log contains two entries: -// 1. the request line containing: -// - unique id allowing to match the response line (see 2) -// - source ip of the request -// - HTTP method being invoked -// - original user invoking the operation -// - original user's groups info -// - impersonated user for the operation -// - impersonated groups info -// - namespace of the request or -// - uri is the full URI as requested -// 2. the response line containing: -// - the unique id from 1 -// - response code -func WithAudit(handler http.Handler, requestContextMapper request.RequestContextMapper, out io.Writer) http.Handler { - if out == nil { - return handler - } - return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - ctx, ok := requestContextMapper.Get(req) - if !ok { - responsewriters.InternalError(w, req, errors.New("no context found for request")) - return - } - attribs, err := GetAuthorizerAttributes(ctx) - if err != nil { - responsewriters.InternalError(w, req, err) - return - } - - username := "" - groups := "" - if attribs.GetUser() != nil { - username = attribs.GetUser().GetName() - if userGroups := attribs.GetUser().GetGroups(); len(userGroups) > 0 { - groups = auditStringSlice(userGroups) - } - } - asuser := req.Header.Get(authenticationapi.ImpersonateUserHeader) - if len(asuser) == 0 { - asuser = "" - } - asgroups := "" - requestedGroups := req.Header[authenticationapi.ImpersonateGroupHeader] - if len(requestedGroups) > 0 { - asgroups = auditStringSlice(requestedGroups) - } - namespace := attribs.GetNamespace() - if len(namespace) == 0 { - namespace = "" - } - id := uuid.NewRandom().String() - - line := fmt.Sprintf("%s AUDIT: id=%q ip=%q method=%q user=%q groups=%q as=%q asgroups=%q namespace=%q uri=%q\n", - time.Now().Format(time.RFC3339Nano), id, utilnet.GetClientIP(req), req.Method, username, groups, asuser, asgroups, namespace, req.URL) - if _, err := fmt.Fprint(out, line); err != nil { - glog.Errorf("Unable to write audit log: %s, the error is: %v", line, err) - } - respWriter := decorateResponseWriter(w, out, id) - handler.ServeHTTP(respWriter, req) - }) -} - -func auditStringSlice(inList []string) string { - quotedElements := make([]string, len(inList)) - for i, in := range inList { - quotedElements[i] = fmt.Sprintf("%q", in) - } - return strings.Join(quotedElements, ",") -} - -func decorateResponseWriter(responseWriter http.ResponseWriter, out io.Writer, id string) http.ResponseWriter { - delegate := &auditResponseWriter{ResponseWriter: responseWriter, out: out, id: id} - // check if the ResponseWriter we're wrapping is the fancy one we need - // or if the basic is sufficient - _, cn := responseWriter.(http.CloseNotifier) - _, fl := responseWriter.(http.Flusher) - _, hj := responseWriter.(http.Hijacker) - if cn && fl && hj { - return &fancyResponseWriterDelegator{delegate} - } - return delegate -} diff --git a/pkg/endpoints/filters/audit_test.go b/pkg/endpoints/filters/audit_test.go index 211fd5778..14dc84da4 100644 --- a/pkg/endpoints/filters/audit_test.go +++ b/pkg/endpoints/filters/audit_test.go @@ -19,24 +19,62 @@ package filters import ( "bufio" "bytes" - "io/ioutil" "net" "net/http" "net/http/httptest" "reflect" "regexp" "strings" + "sync" "testing" + "time" + "k8s.io/apimachinery/pkg/util/wait" + auditinternal "k8s.io/apiserver/pkg/apis/audit" "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/endpoints/request" + pluginlog "k8s.io/apiserver/plugin/pkg/audit/log" ) -type simpleResponseWriter struct { - http.ResponseWriter +type fakeAuditSink struct { + lock sync.Mutex + events []*auditinternal.Event } -func (*simpleResponseWriter) WriteHeader(code int) {} +func (s *fakeAuditSink) ProcessEvents(evs ...*auditinternal.Event) { + s.lock.Lock() + defer s.lock.Unlock() + s.events = append(s.events, evs...) +} + +func (s *fakeAuditSink) Events() []*auditinternal.Event { + s.lock.Lock() + defer s.lock.Unlock() + return append([]*auditinternal.Event{}, s.events...) +} + +func (s *fakeAuditSink) Pop(timeout time.Duration) (*auditinternal.Event, error) { + var result *auditinternal.Event + err := wait.Poll(50*time.Millisecond, wait.ForeverTestTimeout, wait.ConditionFunc(func() (done bool, err error) { + s.lock.Lock() + defer s.lock.Unlock() + if len(s.events) == 0 { + return false, nil + } + result = s.events[0] + s.events = s.events[1:] + return true, nil + })) + return result, err +} + +type simpleResponseWriter struct{} + +var _ http.ResponseWriter = &simpleResponseWriter{} + +func (*simpleResponseWriter) WriteHeader(code int) {} +func (*simpleResponseWriter) Write(bs []byte) (int, error) { return len(bs), nil } +func (*simpleResponseWriter) Header() http.Header { return http.Header{} } type fancyResponseWriter struct { simpleResponseWriter @@ -49,14 +87,14 @@ func (*fancyResponseWriter) Flush() {} func (*fancyResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { return nil, nil, nil } func TestConstructResponseWriter(t *testing.T) { - actual := decorateResponseWriter(&simpleResponseWriter{}, ioutil.Discard, "") + actual := decorateResponseWriter(&simpleResponseWriter{}, nil, nil) switch v := actual.(type) { case *auditResponseWriter: default: t.Errorf("Expected auditResponseWriter, got %v", reflect.TypeOf(v)) } - actual = decorateResponseWriter(&fancyResponseWriter{}, ioutil.Discard, "") + actual = decorateResponseWriter(&fancyResponseWriter{}, nil, nil) switch v := actual.(type) { case *fancyResponseWriterDelegator: default: @@ -64,6 +102,73 @@ func TestConstructResponseWriter(t *testing.T) { } } +func TestDecorateResponseWriterWithoutChannel(t *testing.T) { + ev := &auditinternal.Event{} + actual := decorateResponseWriter(&simpleResponseWriter{}, ev, nil) + + // write status. This will not block because firstEventSentCh is nil + actual.WriteHeader(42) + if ev.ResponseStatus == nil { + t.Fatalf("Expected ResponseStatus to be non-nil") + } + if ev.ResponseStatus.Code != 42 { + t.Errorf("expected status code 42, got %d", ev.ResponseStatus.Code) + } +} + +func TestDecorateResponseWriterWithImplicitWrite(t *testing.T) { + ev := &auditinternal.Event{} + actual := decorateResponseWriter(&simpleResponseWriter{}, ev, nil) + + // write status. This will not block because firstEventSentCh is nil + actual.Write([]byte("foo")) + if ev.ResponseStatus == nil { + t.Fatalf("Expected ResponseStatus to be non-nil") + } + if ev.ResponseStatus.Code != 200 { + t.Errorf("expected status code 200, got %d", ev.ResponseStatus.Code) + } +} + +func TestDecorateResponseWriterChannel(t *testing.T) { + sink := &fakeAuditSink{} + ev := &auditinternal.Event{} + actual := decorateResponseWriter(&simpleResponseWriter{}, ev, sink) + + done := make(chan struct{}) + go func() { + t.Log("Writing status code 42") + actual.WriteHeader(42) + t.Log("Finished writing status code 42") + close(done) + + actual.Write([]byte("foo")) + }() + + // sleep some time to give write the possibility to do wrong stuff + time.Sleep(100 * time.Millisecond) + + t.Log("Waiting for event in the channel") + ev1, err := sink.Pop(time.Second) + if err != nil { + t.Fatal("Timeout waiting for events") + } + t.Logf("Seen event with status %v", ev1.ResponseStatus) + + if ev != ev1 { + t.Fatalf("ev1 and ev must be equal") + } + + <-done + t.Log("Seen the go routine finished") + + // write again + _, err = actual.Write([]byte("foo")) + if err != nil { + t.Errorf("unexpected error: %v", err) + } +} + type fakeHTTPHandler struct{} func (*fakeHTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { @@ -71,32 +176,189 @@ func (*fakeHTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { } func TestAudit(t *testing.T) { - var buf bytes.Buffer + shortRunningPrefix := `[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" ip="127.0.0.1" method="list" user="admin" groups="" as="" asgroups="" namespace="default" uri="/api/v1/namespaces/default/pods"` + longRunningPrefix := `[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" ip="127.0.0.1" method="watch" user="admin" groups="" as="" asgroups="" namespace="default" uri="/api/v1/namespaces/default/pods\?watch=true"` - handler := WithAudit(&fakeHTTPHandler{}, &fakeRequestContextMapper{ - user: &user.DefaultInfo{Name: "admin"}, - }, &buf) + shortRunningPath := "/api/v1/namespaces/default/pods" + longRunningPath := "/api/v1/namespaces/default/pods?watch=true" - req, _ := http.NewRequest("GET", "/api/v1/namespaces/default/pods", nil) - req.RemoteAddr = "127.0.0.1" - handler.ServeHTTP(httptest.NewRecorder(), req) - line := strings.Split(strings.TrimSpace(buf.String()), "\n") - if len(line) != 2 { - t.Fatalf("Unexpected amount of lines in audit log: %d", len(line)) - } - match, err := regexp.MatchString(`[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" ip="127.0.0.1" method="GET" user="admin" groups="" as="" asgroups="" namespace="default" uri="/api/v1/namespaces/default/pods"`, line[0]) - if err != nil { - t.Errorf("Unexpected error matching first line: %v", err) - } - if !match { - t.Errorf("Unexpected first line of audit: %s", line[0]) - } - match, err = regexp.MatchString(`[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" response="200"`, line[1]) - if err != nil { - t.Errorf("Unexpected error matching second line: %v", err) - } - if !match { - t.Errorf("Unexpected second line of audit: %s", line[1]) + delay := 500 * time.Millisecond + + for _, test := range []struct { + desc string + path string + handler func(http.ResponseWriter, *http.Request) + expected []string + }{ + // short running requests + { + "empty", + shortRunningPath, + func(http.ResponseWriter, *http.Request) {}, + []string{ + shortRunningPrefix + ` response="200"`, + }, + }, + { + "sleep", + shortRunningPath, + func(http.ResponseWriter, *http.Request) { + time.Sleep(delay) + }, + []string{ + shortRunningPrefix + ` response="200"`, + }, + }, + { + "403+write", + shortRunningPath, + func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(403) + w.Write([]byte("foo")) + }, + []string{ + shortRunningPrefix + ` response="403"`, + }, + }, + { + "panic", + shortRunningPath, + func(w http.ResponseWriter, req *http.Request) { + panic("kaboom") + }, + []string{ + shortRunningPrefix + ` response="500"`, + }, + }, + { + "write+panic", + shortRunningPath, + func(w http.ResponseWriter, req *http.Request) { + w.Write([]byte("foo")) + panic("kaboom") + }, + []string{ + shortRunningPrefix + ` response="500"`, + }, + }, + + // long running requests + { + "empty longrunning", + longRunningPath, + func(http.ResponseWriter, *http.Request) {}, + []string{ + longRunningPrefix + ` response="200"`, + }, + }, + { + "sleep longrunning", + longRunningPath, + func(http.ResponseWriter, *http.Request) { + time.Sleep(delay) + }, + []string{ + longRunningPrefix + ` response="200"`, + }, + }, + { + "sleep+403 longrunning", + longRunningPath, + func(w http.ResponseWriter, req *http.Request) { + time.Sleep(delay) + w.WriteHeader(403) + }, + []string{ + longRunningPrefix + ` response=""`, + longRunningPrefix + ` response="403"`, + }, + }, + { + "write longrunning", + longRunningPath, + func(w http.ResponseWriter, req *http.Request) { + w.Write([]byte("foo")) + }, + []string{ + longRunningPrefix + ` response=""`, + longRunningPrefix + ` response="200"`, + }, + }, + { + "403+write longrunning", + longRunningPath, + func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(403) + w.Write([]byte("foo")) + }, + []string{ + longRunningPrefix + ` response=""`, + longRunningPrefix + ` response="403"`, + }, + }, + { + "panic longrunning", + longRunningPath, + func(w http.ResponseWriter, req *http.Request) { + panic("kaboom") + }, + []string{ + longRunningPrefix + ` response="500"`, + }, + }, + { + "write+panic longrunning", + longRunningPath, + func(w http.ResponseWriter, req *http.Request) { + w.Write([]byte("foo")) + panic("kaboom") + }, + []string{ + longRunningPrefix + ` response=""`, + longRunningPrefix + ` response="500"`, + }, + }, + } { + var buf bytes.Buffer + backend := pluginlog.NewBackend(&buf) + handler := WithAudit(http.HandlerFunc(test.handler), &fakeRequestContextMapper{ + user: &user.DefaultInfo{Name: "admin"}, + }, backend, auditinternal.NewConstantPolicy(auditinternal.LevelRequestResponse), func(r *http.Request, ri *request.RequestInfo) bool { + // simplified long-running check + return ri.Verb == "watch" + }) + + req, _ := http.NewRequest("GET", test.path, nil) + req.RemoteAddr = "127.0.0.1" + + done := make(chan struct{}) + go func() { + defer func() { + recover() + close(done) + }() + handler.ServeHTTP(httptest.NewRecorder(), req) + }() + <-done + + t.Logf("[%s] audit log: %v", test.desc, buf.String()) + + line := strings.Split(strings.TrimSpace(buf.String()), "\n") + if len(line) != len(test.expected) { + t.Errorf("[%s] Unexpected amount of lines in audit log: %d", test.desc, len(line)) + continue + } + + for i, re := range test.expected { + match, err := regexp.MatchString(re, line[i]) + if err != nil { + t.Errorf("[%s] Unexpected error matching line %d: %v", test.desc, i, err) + continue + } + if !match { + t.Errorf("[%s] Unexpected line %d of audit: %s", test.desc, i, line[i]) + } + } } } @@ -124,29 +386,8 @@ func (*fakeRequestContextMapper) Update(req *http.Request, context request.Conte } func TestAuditNoPanicOnNilUser(t *testing.T) { - var buf bytes.Buffer - - handler := WithAudit(&fakeHTTPHandler{}, &fakeRequestContextMapper{}, &buf) - + handler := WithAudit(&fakeHTTPHandler{}, &fakeRequestContextMapper{}, &fakeAuditSink{}, auditinternal.NewConstantPolicy(auditinternal.LevelRequestResponse), nil) req, _ := http.NewRequest("GET", "/api/v1/namespaces/default/pods", nil) req.RemoteAddr = "127.0.0.1" handler.ServeHTTP(httptest.NewRecorder(), req) - line := strings.Split(strings.TrimSpace(buf.String()), "\n") - if len(line) != 2 { - t.Fatalf("Unexpected amount of lines in audit log: %d", len(line)) - } - match, err := regexp.MatchString(`[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" ip="127.0.0.1" method="GET" user="" groups="" as="" asgroups="" namespace="default" uri="/api/v1/namespaces/default/pods"`, line[0]) - if err != nil { - t.Errorf("Unexpected error matching first line: %v", err) - } - if !match { - t.Errorf("Unexpected first line of audit: %s", line[0]) - } - match, err = regexp.MatchString(`[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" response="200"`, line[1]) - if err != nil { - t.Errorf("Unexpected error matching second line: %v", err) - } - if !match { - t.Errorf("Unexpected second line of audit: %s", line[1]) - } } diff --git a/pkg/endpoints/filters/legacy_audit.go b/pkg/endpoints/filters/legacy_audit.go new file mode 100644 index 000000000..98353ed01 --- /dev/null +++ b/pkg/endpoints/filters/legacy_audit.go @@ -0,0 +1,162 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filters + +import ( + "bufio" + "errors" + "fmt" + "io" + "net" + "net/http" + "strings" + "time" + + "github.com/golang/glog" + "github.com/pborman/uuid" + + utilnet "k8s.io/apimachinery/pkg/util/net" + "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" + "k8s.io/apiserver/pkg/endpoints/request" + authenticationapi "k8s.io/client-go/pkg/apis/authentication/v1" +) + +var _ http.ResponseWriter = &legacyAuditResponseWriter{} + +type legacyAuditResponseWriter struct { + http.ResponseWriter + out io.Writer + id string +} + +func (a *legacyAuditResponseWriter) WriteHeader(code int) { + line := fmt.Sprintf("%s AUDIT: id=%q response=\"%d\"\n", time.Now().Format(time.RFC3339Nano), a.id, code) + if _, err := fmt.Fprint(a.out, line); err != nil { + glog.Errorf("Unable to write audit log: %s, the error is: %v", line, err) + } + + a.ResponseWriter.WriteHeader(code) +} + +// fancyLegacyResponseWriterDelegator implements http.CloseNotifier, http.Flusher and +// http.Hijacker which are needed to make certain http operation (e.g. watch, rsh, etc) +// working. +type fancyLegacyResponseWriterDelegator struct { + *legacyAuditResponseWriter +} + +func (f *fancyLegacyResponseWriterDelegator) CloseNotify() <-chan bool { + return f.ResponseWriter.(http.CloseNotifier).CloseNotify() +} + +func (f *fancyLegacyResponseWriterDelegator) Flush() { + f.ResponseWriter.(http.Flusher).Flush() +} + +func (f *fancyLegacyResponseWriterDelegator) Hijack() (net.Conn, *bufio.ReadWriter, error) { + return f.ResponseWriter.(http.Hijacker).Hijack() +} + +var _ http.CloseNotifier = &fancyLegacyResponseWriterDelegator{} +var _ http.Flusher = &fancyLegacyResponseWriterDelegator{} +var _ http.Hijacker = &fancyLegacyResponseWriterDelegator{} + +// WithAudit decorates a http.Handler with audit logging information for all the +// requests coming to the server. If out is nil, no decoration takes place. +// Each audit log contains two entries: +// 1. the request line containing: +// - unique id allowing to match the response line (see 2) +// - source ip of the request +// - HTTP method being invoked +// - original user invoking the operation +// - original user's groups info +// - impersonated user for the operation +// - impersonated groups info +// - namespace of the request or +// - uri is the full URI as requested +// 2. the response line containing: +// - the unique id from 1 +// - response code +func WithLegacyAudit(handler http.Handler, requestContextMapper request.RequestContextMapper, out io.Writer) http.Handler { + if out == nil { + return handler + } + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + ctx, ok := requestContextMapper.Get(req) + if !ok { + responsewriters.InternalError(w, req, errors.New("no context found for request")) + return + } + attribs, err := GetAuthorizerAttributes(ctx) + if err != nil { + responsewriters.InternalError(w, req, err) + return + } + + username := "" + groups := "" + if attribs.GetUser() != nil { + username = attribs.GetUser().GetName() + if userGroups := attribs.GetUser().GetGroups(); len(userGroups) > 0 { + groups = auditStringSlice(userGroups) + } + } + asuser := req.Header.Get(authenticationapi.ImpersonateUserHeader) + if len(asuser) == 0 { + asuser = "" + } + asgroups := "" + requestedGroups := req.Header[authenticationapi.ImpersonateGroupHeader] + if len(requestedGroups) > 0 { + asgroups = auditStringSlice(requestedGroups) + } + namespace := attribs.GetNamespace() + if len(namespace) == 0 { + namespace = "" + } + id := uuid.NewRandom().String() + + line := fmt.Sprintf("%s AUDIT: id=%q ip=%q method=%q user=%q groups=%q as=%q asgroups=%q namespace=%q uri=%q\n", + time.Now().Format(time.RFC3339Nano), id, utilnet.GetClientIP(req), req.Method, username, groups, asuser, asgroups, namespace, req.URL) + if _, err := fmt.Fprint(out, line); err != nil { + glog.Errorf("Unable to write audit log: %s, the error is: %v", line, err) + } + respWriter := legacyDecorateResponseWriter(w, out, id) + handler.ServeHTTP(respWriter, req) + }) +} + +func auditStringSlice(inList []string) string { + quotedElements := make([]string, len(inList)) + for i, in := range inList { + quotedElements[i] = fmt.Sprintf("%q", in) + } + return strings.Join(quotedElements, ",") +} + +func legacyDecorateResponseWriter(responseWriter http.ResponseWriter, out io.Writer, id string) http.ResponseWriter { + delegate := &legacyAuditResponseWriter{ResponseWriter: responseWriter, out: out, id: id} + // check if the ResponseWriter we're wrapping is the fancy one we need + // or if the basic is sufficient + _, cn := responseWriter.(http.CloseNotifier) + _, fl := responseWriter.(http.Flusher) + _, hj := responseWriter.(http.Hijacker) + if cn && fl && hj { + return &fancyLegacyResponseWriterDelegator{delegate} + } + return delegate +} diff --git a/pkg/endpoints/filters/legacy_audit_test.go b/pkg/endpoints/filters/legacy_audit_test.go new file mode 100644 index 000000000..7a833a28e --- /dev/null +++ b/pkg/endpoints/filters/legacy_audit_test.go @@ -0,0 +1,104 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filters + +import ( + "bytes" + "io/ioutil" + "net/http" + "net/http/httptest" + "reflect" + "regexp" + "strings" + "testing" + + "k8s.io/apiserver/pkg/authentication/user" +) + +func TestLegacyConstructResponseWriter(t *testing.T) { + actual := legacyDecorateResponseWriter(&simpleResponseWriter{}, ioutil.Discard, "") + switch v := actual.(type) { + case *legacyAuditResponseWriter: + default: + t.Errorf("Expected auditResponseWriter, got %v", reflect.TypeOf(v)) + } + + actual = legacyDecorateResponseWriter(&fancyResponseWriter{}, ioutil.Discard, "") + switch v := actual.(type) { + case *fancyLegacyResponseWriterDelegator: + default: + t.Errorf("Expected fancyResponseWriterDelegator, got %v", reflect.TypeOf(v)) + } +} + +func TestLegacyAudit(t *testing.T) { + var buf bytes.Buffer + + handler := WithLegacyAudit(&fakeHTTPHandler{}, &fakeRequestContextMapper{ + user: &user.DefaultInfo{Name: "admin"}, + }, &buf) + + req, _ := http.NewRequest("GET", "/api/v1/namespaces/default/pods", nil) + req.RemoteAddr = "127.0.0.1" + handler.ServeHTTP(httptest.NewRecorder(), req) + line := strings.Split(strings.TrimSpace(buf.String()), "\n") + if len(line) != 2 { + t.Fatalf("Unexpected amount of lines in audit log: %d", len(line)) + } + match, err := regexp.MatchString(`[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" ip="127.0.0.1" method="GET" user="admin" groups="" as="" asgroups="" namespace="default" uri="/api/v1/namespaces/default/pods"`, line[0]) + if err != nil { + t.Errorf("Unexpected error matching first line: %v", err) + } + if !match { + t.Errorf("Unexpected first line of audit: %s", line[0]) + } + match, err = regexp.MatchString(`[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" response="200"`, line[1]) + if err != nil { + t.Errorf("Unexpected error matching second line: %v", err) + } + if !match { + t.Errorf("Unexpected second line of audit: %s", line[1]) + } +} + +func TestLegacyAuditNoPanicOnNilUser(t *testing.T) { + var buf bytes.Buffer + + handler := WithLegacyAudit(&fakeHTTPHandler{}, &fakeRequestContextMapper{}, &buf) + + req, _ := http.NewRequest("GET", "/api/v1/namespaces/default/pods", nil) + req.RemoteAddr = "127.0.0.1" + handler.ServeHTTP(httptest.NewRecorder(), req) + line := strings.Split(strings.TrimSpace(buf.String()), "\n") + if len(line) != 2 { + t.Fatalf("Unexpected amount of lines in audit log: %d", len(line)) + } + match, err := regexp.MatchString(`[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" ip="127.0.0.1" method="GET" user="" groups="" as="" asgroups="" namespace="default" uri="/api/v1/namespaces/default/pods"`, line[0]) + if err != nil { + t.Errorf("Unexpected error matching first line: %v", err) + } + if !match { + t.Errorf("Unexpected first line of audit: %s", line[0]) + } + match, err = regexp.MatchString(`[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" response="200"`, line[1]) + if err != nil { + t.Errorf("Unexpected error matching second line: %v", err) + } + if !match { + t.Errorf("Unexpected second line of audit: %s", line[1]) + } +} diff --git a/pkg/endpoints/handlers/proxy.go b/pkg/endpoints/handlers/proxy.go index fa8b1b16b..586ab494c 100644 --- a/pkg/endpoints/handlers/proxy.go +++ b/pkg/endpoints/handlers/proxy.go @@ -118,14 +118,14 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { redirector, ok := storage.(rest.Redirector) if !ok { httplog.LogOf(req, w).Addf("'%v' is not a redirector", resource) - httpCode = responsewriters.ErrorNegotiated(apierrors.NewMethodNotSupported(schema.GroupResource{Resource: resource}, "proxy"), r.Serializer, gv, w, req) + httpCode = responsewriters.ErrorNegotiated(ctx, apierrors.NewMethodNotSupported(schema.GroupResource{Resource: resource}, "proxy"), r.Serializer, gv, w, req) return } location, roundTripper, err := redirector.ResourceLocation(ctx, id) if err != nil { httplog.LogOf(req, w).Addf("Error getting ResourceLocation: %v", err) - httpCode = responsewriters.ErrorNegotiated(err, r.Serializer, gv, w, req) + httpCode = responsewriters.ErrorNegotiated(ctx, err, r.Serializer, gv, w, req) return } if location == nil { @@ -158,7 +158,7 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { newReq, err := http.NewRequest(req.Method, location.String(), req.Body) if err != nil { - httpCode = responsewriters.ErrorNegotiated(err, r.Serializer, gv, w, req) + httpCode = responsewriters.ErrorNegotiated(ctx, err, r.Serializer, gv, w, req) return } httpCode = http.StatusOK @@ -171,7 +171,7 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { // TODO convert this entire proxy to an UpgradeAwareProxy similar to // https://github.com/openshift/origin/blob/master/pkg/util/httpproxy/upgradeawareproxy.go. // That proxy needs to be modified to support multiple backends, not just 1. - if r.tryUpgrade(w, req, newReq, location, roundTripper, gv) { + if r.tryUpgrade(w, req, newReq, location, roundTripper, gv, ctx) { return } @@ -220,13 +220,13 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { } // tryUpgrade returns true if the request was handled. -func (r *ProxyHandler) tryUpgrade(w http.ResponseWriter, req, newReq *http.Request, location *url.URL, transport http.RoundTripper, gv schema.GroupVersion) bool { +func (r *ProxyHandler) tryUpgrade(w http.ResponseWriter, req, newReq *http.Request, location *url.URL, transport http.RoundTripper, gv schema.GroupVersion, ctx request.Context) bool { if !httpstream.IsUpgradeRequest(req) { return false } backendConn, err := proxyutil.DialURL(location, transport) if err != nil { - responsewriters.ErrorNegotiated(err, r.Serializer, gv, w, req) + responsewriters.ErrorNegotiated(ctx, err, r.Serializer, gv, w, req) return true } defer backendConn.Close() @@ -236,13 +236,13 @@ func (r *ProxyHandler) tryUpgrade(w http.ResponseWriter, req, newReq *http.Reque // hijack, just for reference... requestHijackedConn, _, err := w.(http.Hijacker).Hijack() if err != nil { - responsewriters.ErrorNegotiated(err, r.Serializer, gv, w, req) + responsewriters.ErrorNegotiated(ctx, err, r.Serializer, gv, w, req) return true } defer requestHijackedConn.Close() if err = newReq.Write(backendConn); err != nil { - responsewriters.ErrorNegotiated(err, r.Serializer, gv, w, req) + responsewriters.ErrorNegotiated(ctx, err, r.Serializer, gv, w, req) return true } diff --git a/pkg/endpoints/handlers/responsewriters/writers.go b/pkg/endpoints/handlers/responsewriters/writers.go index 78faefbac..1e8855753 100644 --- a/pkg/endpoints/handlers/responsewriters/writers.go +++ b/pkg/endpoints/handlers/responsewriters/writers.go @@ -26,7 +26,9 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" + "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/util/flushwriter" "k8s.io/apiserver/pkg/util/wsstream" @@ -37,16 +39,16 @@ import ( // response. The Accept header and current API version will be passed in, and the output will be copied // directly to the response body. If content type is returned it is used, otherwise the content type will // be "application/octet-stream". All other objects are sent to standard JSON serialization. -func WriteObject(statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSerializer, object runtime.Object, w http.ResponseWriter, req *http.Request) { +func WriteObject(ctx request.Context, statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSerializer, object runtime.Object, w http.ResponseWriter, req *http.Request) { stream, ok := object.(rest.ResourceStreamer) if !ok { - WriteObjectNegotiated(s, gv, w, req, statusCode, object) + WriteObjectNegotiated(ctx, s, gv, w, req, statusCode, object) return } out, flush, contentType, err := stream.InputStream(gv.String(), req.Header.Get("Accept")) if err != nil { - ErrorNegotiated(err, s, gv, w, req) + ErrorNegotiated(ctx, err, s, gv, w, req) return } if out == nil { @@ -76,8 +78,9 @@ func WriteObject(statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSer io.Copy(writer, out) } -// WriteObjectNegotiated renders an object in the content type negotiated by the client -func WriteObjectNegotiated(s runtime.NegotiatedSerializer, gv schema.GroupVersion, w http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) { +// WriteObjectNegotiated renders an object in the content type negotiated by the client. +// The context is optional and can be nil. +func WriteObjectNegotiated(ctx request.Context, s runtime.NegotiatedSerializer, gv schema.GroupVersion, w http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) { serializer, err := negotiation.NegotiateOutputSerializer(req, s) if err != nil { status := apiStatus(err) @@ -85,6 +88,10 @@ func WriteObjectNegotiated(s runtime.NegotiatedSerializer, gv schema.GroupVersio return } + if ae := request.AuditEventFrom(ctx); ae != nil { + audit.LogResponseObject(ae, object, gv, s) + } + w.Header().Set("Content-Type", serializer.MediaType) w.WriteHeader(statusCode) @@ -95,7 +102,8 @@ func WriteObjectNegotiated(s runtime.NegotiatedSerializer, gv schema.GroupVersio } // ErrorNegotiated renders an error to the response. Returns the HTTP status code of the error. -func ErrorNegotiated(err error, s runtime.NegotiatedSerializer, gv schema.GroupVersion, w http.ResponseWriter, req *http.Request) int { +// The context is options and may be nil. +func ErrorNegotiated(ctx request.Context, err error, s runtime.NegotiatedSerializer, gv schema.GroupVersion, w http.ResponseWriter, req *http.Request) int { status := apiStatus(err) code := int(status.Code) // when writing an error, check to see if the status indicates a retry after period @@ -109,7 +117,7 @@ func ErrorNegotiated(err error, s runtime.NegotiatedSerializer, gv schema.GroupV return code } - WriteObjectNegotiated(s, gv, w, req, code, status) + WriteObjectNegotiated(ctx, s, gv, w, req, code, status) return code } diff --git a/pkg/endpoints/handlers/rest.go b/pkg/endpoints/handlers/rest.go index 8282178aa..854091c85 100644 --- a/pkg/endpoints/handlers/rest.go +++ b/pkg/endpoints/handlers/rest.go @@ -42,6 +42,7 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apiserver/pkg/admission" + "k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" "k8s.io/apiserver/pkg/endpoints/request" @@ -72,7 +73,8 @@ type RequestScope struct { } func (scope *RequestScope) err(err error, w http.ResponseWriter, req *http.Request) { - responsewriters.ErrorNegotiated(err, scope.Serializer, scope.Kind.GroupVersion(), w, req) + ctx := scope.ContextFunc(req) + responsewriters.ErrorNegotiated(ctx, err, scope.Serializer, scope.Kind.GroupVersion(), w, req) } // getterFunc performs a get request with the given context and object name. The request @@ -111,8 +113,9 @@ func getResourceHandler(scope RequestScope, getter getterFunc) http.HandlerFunc scope.err(err, w, req) return } + trace.Step("About to write a response") - responsewriters.WriteObject(http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req) + responsewriters.WriteObject(ctx, http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req) } } @@ -231,7 +234,8 @@ type responder struct { } func (r *responder) Object(statusCode int, obj runtime.Object) { - responsewriters.WriteObject(statusCode, r.scope.Kind.GroupVersion(), r.scope.Serializer, obj, r.w, r.req) + ctx := r.scope.ContextFunc(r.req) + responsewriters.WriteObject(ctx, statusCode, r.scope.Kind.GroupVersion(), r.scope.Serializer, obj, r.w, r.req) } func (r *responder) Error(err error) { @@ -343,7 +347,8 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch return } } - responsewriters.WriteObject(http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req) + + responsewriters.WriteObject(ctx, http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req) trace.Step(fmt.Sprintf("Writing http response done (%d items)", numberOfItems)) } } @@ -404,6 +409,9 @@ func createHandler(r rest.NamedCreater, scope RequestScope, typer runtime.Object } trace.Step("Conversion done") + ae := request.AuditEventFrom(ctx) + audit.LogRequestObject(ae, obj, scope.Resource.GroupVersion(), scope.Serializer) + if admit != nil && admit.Handles(admission.Create) { userInfo, _ := request.UserFrom(ctx) @@ -439,7 +447,7 @@ func createHandler(r rest.NamedCreater, scope RequestScope, typer runtime.Object } trace.Step("Self-link added") - responsewriters.WriteObject(http.StatusCreated, scope.Kind.GroupVersion(), scope.Serializer, result, w, req) + responsewriters.WriteObject(ctx, http.StatusCreated, scope.Kind.GroupVersion(), scope.Serializer, result, w, req) } } @@ -499,6 +507,9 @@ func PatchResource(r rest.Patcher, scope RequestScope, admit admission.Interface return } + ae := request.AuditEventFrom(ctx) + audit.LogRequestPatch(ae, patchJS) + s, ok := runtime.SerializerInfoForMediaType(scope.Serializer.SupportedMediaTypes(), runtime.ContentTypeJSON) if !ok { scope.err(fmt.Errorf("no serializer defined for JSON"), w, req) @@ -536,7 +547,7 @@ func PatchResource(r rest.Patcher, scope RequestScope, admit admission.Interface return } - responsewriters.WriteObject(http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req) + responsewriters.WriteObject(ctx, http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req) } } @@ -822,6 +833,9 @@ func UpdateResource(r rest.Updater, scope RequestScope, typer runtime.ObjectType } trace.Step("Conversion done") + ae := request.AuditEventFrom(ctx) + audit.LogRequestObject(ae, obj, scope.Resource.GroupVersion(), scope.Serializer) + if err := checkName(obj, name, namespace, scope.Namer); err != nil { scope.err(err, w, req) return @@ -863,7 +877,7 @@ func UpdateResource(r rest.Updater, scope RequestScope, typer runtime.ObjectType if wasCreated { status = http.StatusCreated } - responsewriters.WriteObject(status, scope.Kind.GroupVersion(), scope.Serializer, result, w, req) + responsewriters.WriteObject(ctx, status, scope.Kind.GroupVersion(), scope.Serializer, result, w, req) } } @@ -910,6 +924,9 @@ func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope RequestSco scope.err(fmt.Errorf("decoded object cannot be converted to DeleteOptions"), w, req) return } + + ae := request.AuditEventFrom(ctx) + audit.LogRequestObject(ae, obj, scope.Resource.GroupVersion(), scope.Serializer) } else { if values := req.URL.Query(); len(values) > 0 { if err := metainternalversion.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, options); err != nil { @@ -978,7 +995,8 @@ func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope RequestSco } } } - responsewriters.WriteObject(status, scope.Kind.GroupVersion(), scope.Serializer, result, w, req) + + responsewriters.WriteObject(ctx, status, scope.Kind.GroupVersion(), scope.Serializer, result, w, req) } } @@ -1050,6 +1068,9 @@ func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope RequestSco scope.err(fmt.Errorf("decoded object cannot be converted to DeleteOptions"), w, req) return } + + ae := request.AuditEventFrom(ctx) + audit.LogRequestObject(ae, obj, scope.Resource.GroupVersion(), scope.Serializer) } } @@ -1080,7 +1101,8 @@ func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope RequestSco } } } - responsewriters.WriteObjectNegotiated(scope.Serializer, scope.Kind.GroupVersion(), w, req, http.StatusOK, result) + + responsewriters.WriteObjectNegotiated(ctx, scope.Serializer, scope.Kind.GroupVersion(), w, req, http.StatusOK, result) } } diff --git a/pkg/endpoints/request/context.go b/pkg/endpoints/request/context.go index cc64aa194..b63b84dc7 100644 --- a/pkg/endpoints/request/context.go +++ b/pkg/endpoints/request/context.go @@ -22,6 +22,7 @@ import ( "golang.org/x/net/context" "k8s.io/apimachinery/pkg/types" + "k8s.io/apiserver/pkg/apis/audit" "k8s.io/apiserver/pkg/authentication/user" ) @@ -63,6 +64,9 @@ const ( // userAgentKey is the context key for the request user agent. userAgentKey + // auditKey is the context key for the audit event. + auditKey + namespaceDefault = "default" // TODO(sttts): solve import cycle when using metav1.NamespaceDefault ) @@ -143,3 +147,14 @@ func UserAgentFrom(ctx Context) (string, bool) { userAgent, ok := ctx.Value(userAgentKey).(string) return userAgent, ok } + +// WithAuditEvent returns set audit event struct. +func WithAuditEvent(parent Context, ev *audit.Event) Context { + return WithValue(parent, auditKey, ev) +} + +// AuditEventFrom returns the audit event struct on the ctx +func AuditEventFrom(ctx Context) *audit.Event { + ev, _ := ctx.Value(auditKey).(*audit.Event) + return ev +} diff --git a/pkg/server/config.go b/pkg/server/config.go index bb6710f05..24978a2ff 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -20,7 +20,6 @@ import ( "crypto/tls" "crypto/x509" "fmt" - "io" "net" "net/http" goruntime "runtime" @@ -39,6 +38,8 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/version" "k8s.io/apiserver/pkg/admission" + auditinternal "k8s.io/apiserver/pkg/apis/audit" + "k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/authentication/authenticator" "k8s.io/apiserver/pkg/authentication/authenticatorfactory" authenticatorunion "k8s.io/apiserver/pkg/authentication/request/union" @@ -100,8 +101,11 @@ type Config struct { // Version will enable the /version endpoint if non-nil Version *version.Info - // AuditWriter is the destination for audit logs. If nil, they will not be written. - AuditWriter io.Writer + // AuditBackend is where audit events are sent to. + AuditBackend audit.Backend + // AuditPolicy defines rules which determine the audit level for different requests. + AuditPolicy *auditinternal.Policy + // SupportsBasicAuth indicates that's at least one Authenticator supports basic auth // If this is true, a basic auth challenge is returned on authentication failure // TODO(roberthbailey): Remove once the server no longer supports http basic auth. @@ -374,7 +378,7 @@ func (c completedConfig) New(delegationTarget DelegationTarget) (*GenericAPIServ handlerChainBuilder := func(handler http.Handler) http.Handler { return c.BuildHandlerChainFunc(handler, c.Config) } - apiServerHandler := NewAPIServerHandler(c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler()) + apiServerHandler := NewAPIServerHandler(c.RequestContextMapper, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler()) s := &GenericAPIServer{ discoveryAddresses: c.DiscoveryAddresses, @@ -383,6 +387,7 @@ func (c completedConfig) New(delegationTarget DelegationTarget) (*GenericAPIServ admissionControl: c.AdmissionControl, requestContextMapper: c.RequestContextMapper, Serializer: c.Serializer, + AuditBackend: c.AuditBackend, minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second, @@ -452,7 +457,8 @@ func (c completedConfig) New(delegationTarget DelegationTarget) (*GenericAPIServ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { handler := genericapifilters.WithAuthorization(apiHandler, c.RequestContextMapper, c.Authorizer) handler = genericapifilters.WithImpersonation(handler, c.RequestContextMapper, c.Authorizer) - handler = genericapifilters.WithAudit(handler, c.RequestContextMapper, c.AuditWriter) + // TODO(audit): use WithLegacyAudit if feature flag is false + handler = genericapifilters.WithAudit(handler, c.RequestContextMapper, c.AuditBackend, c.AuditPolicy, c.LongRunningFunc) handler = genericapifilters.WithAuthentication(handler, c.RequestContextMapper, c.Authenticator, genericapifilters.Unauthorized(c.SupportsBasicAuth)) handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true") handler = genericfilters.WithPanicRecovery(handler) diff --git a/pkg/server/genericapiserver.go b/pkg/server/genericapiserver.go index a11148071..27aac3c97 100644 --- a/pkg/server/genericapiserver.go +++ b/pkg/server/genericapiserver.go @@ -37,6 +37,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/admission" + "k8s.io/apiserver/pkg/audit" genericapi "k8s.io/apiserver/pkg/endpoints" "k8s.io/apiserver/pkg/endpoints/discovery" apirequest "k8s.io/apiserver/pkg/endpoints/request" @@ -141,6 +142,9 @@ type GenericAPIServer struct { healthzLock sync.Mutex healthzChecks []healthz.HealthzChecker healthzCreated bool + + // auditing. The backend is started after the server starts listening. + AuditBackend audit.Backend } // DelegationTarget is an interface which allows for composition of API servers with top level handling that works @@ -272,6 +276,14 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error { close(internalStopCh) }() + // Start the audit backend before any request comes in. This means we cannot turn it into a + // post start hook because without calling Backend.Run the Backend.ProcessEvents call might block. + if s.AuditBackend != nil { + if err := s.AuditBackend.Run(stopCh); err != nil { + return fmt.Errorf("failed to run the audit backend: %v", err) + } + } + s.RunPostStartHooks(stopCh) if _, err := systemd.SdNotify(true, "READY=1\n"); err != nil { diff --git a/pkg/server/handler.go b/pkg/server/handler.go index 7c205f005..89da8e314 100644 --- a/pkg/server/handler.go +++ b/pkg/server/handler.go @@ -109,11 +109,12 @@ func logStackOnRecover(s runtime.NegotiatedSerializer, panicReason interface{}, if ct := w.Header().Get("Content-Type"); len(ct) > 0 { headers.Set("Accept", ct) } - responsewriters.ErrorNegotiated(apierrors.NewGenericServerResponse(http.StatusInternalServerError, "", schema.GroupResource{}, "", "", 0, false), s, schema.GroupVersion{}, w, &http.Request{Header: headers}) + responsewriters.ErrorNegotiated(nil, apierrors.NewGenericServerResponse(http.StatusInternalServerError, "", schema.GroupResource{}, "", "", 0, false), s, schema.GroupVersion{}, w, &http.Request{Header: headers}) } func serviceErrorHandler(s runtime.NegotiatedSerializer, serviceErr restful.ServiceError, request *restful.Request, resp *restful.Response) { responsewriters.ErrorNegotiated( + nil, apierrors.NewGenericServerResponse(serviceErr.Code, "", schema.GroupResource{}, "", serviceErr.Message, 0, false), s, schema.GroupVersion{}, diff --git a/pkg/server/options/audit.go b/pkg/server/options/audit.go index a272b025f..c9362d231 100644 --- a/pkg/server/options/audit.go +++ b/pkg/server/options/audit.go @@ -17,12 +17,14 @@ limitations under the License. package options import ( + "io" "os" "github.com/spf13/pflag" "gopkg.in/natefinch/lumberjack.v2" "k8s.io/apiserver/pkg/server" + pluginlog "k8s.io/apiserver/plugin/pkg/audit/log" ) type AuditLogOptions struct { @@ -52,16 +54,15 @@ func (o *AuditLogOptions) ApplyTo(c *server.Config) error { return nil } - if o.Path == "-" { - c.AuditWriter = os.Stdout - return nil - } - - c.AuditWriter = &lumberjack.Logger{ - Filename: o.Path, - MaxAge: o.MaxAge, - MaxBackups: o.MaxBackups, - MaxSize: o.MaxSize, + var w io.Writer = os.Stdout + if o.Path != "-" { + w = &lumberjack.Logger{ + Filename: o.Path, + MaxAge: o.MaxAge, + MaxBackups: o.MaxBackups, + MaxSize: o.MaxSize, + } } + c.AuditBackend = pluginlog.NewBackend(w) return nil } diff --git a/plugin/pkg/audit/doc.go b/plugin/pkg/audit/doc.go new file mode 100644 index 000000000..a819ff379 --- /dev/null +++ b/plugin/pkg/audit/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package auduĂșt contains implementations for pkg/audit/AuditBackend interface +package audit // import "k8s.io/apiserver/plugin/pkg/audit" diff --git a/plugin/pkg/audit/log/backend.go b/plugin/pkg/audit/log/backend.go new file mode 100644 index 000000000..fe15a77d8 --- /dev/null +++ b/plugin/pkg/audit/log/backend.go @@ -0,0 +1,102 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package log + +import ( + "fmt" + "io" + "strconv" + "strings" + "time" + + "github.com/golang/glog" + + auditinternal "k8s.io/apiserver/pkg/apis/audit" + "k8s.io/apiserver/pkg/audit" +) + +type backend struct { + out io.Writer + sink chan *auditinternal.Event +} + +var _ audit.Backend = &backend{} + +func NewBackend(out io.Writer) *backend { + return &backend{ + out: out, + sink: make(chan *auditinternal.Event, 100), + } +} + +func (b *backend) ProcessEvents(events ...*auditinternal.Event) { + for _, ev := range events { + b.logEvent(ev) + } +} + +func (b *backend) logEvent(ev *auditinternal.Event) { + username := "" + groups := "" + if len(ev.User.Username) > 0 { + username = ev.User.Username + if len(ev.User.Groups) > 0 { + groups = auditStringSlice(ev.User.Groups) + } + } + asuser := "" + asgroups := "" + if ev.ImpersonatedUser != nil { + asuser = ev.ImpersonatedUser.Username + if ev.ImpersonatedUser.Groups != nil { + asgroups = auditStringSlice(ev.ImpersonatedUser.Groups) + } + } + + namespace := "" + if ev.ObjectRef != nil && len(ev.ObjectRef.Namespace) != 0 { + namespace = ev.ObjectRef.Namespace + } + + response := "" + if ev.ResponseStatus != nil { + response = strconv.Itoa(int(ev.ResponseStatus.Code)) + } + + ip := "" + if len(ev.SourceIPs) > 0 { + ip = ev.SourceIPs[0] + } + + line := fmt.Sprintf("%s AUDIT: id=%q ip=%q method=%q user=%q groups=%q as=%q asgroups=%q namespace=%q uri=%q response=\"%s\"\n", + ev.Timestamp.Format(time.RFC3339Nano), ev.AuditID, ip, ev.Verb, username, groups, asuser, asgroups, namespace, ev.RequestURI, response) + if _, err := fmt.Fprint(b.out, line); err != nil { + glog.Errorf("Unable to write audit log: %s, the error is: %v", line, err) + } +} + +func (b *backend) Run(stopCh <-chan struct{}) error { + return nil +} + +func auditStringSlice(inList []string) string { + quotedElements := make([]string, len(inList)) + for i, in := range inList { + quotedElements[i] = fmt.Sprintf("%q", in) + } + return strings.Join(quotedElements, ",") +}