audit: uniform 2 or 3 events for short/long running requests

Kubernetes-commit: 548f7be8fa10b6cbedcf179af088536e76a6c0e3
This commit is contained in:
Dr. Stefan Schimanski 2017-05-24 11:06:38 +02:00 committed by Kubernetes Publisher
parent 636c532e31
commit a177d01bf0
4 changed files with 118 additions and 46 deletions

View File

@ -127,7 +127,7 @@ func TestAudit(t *testing.T) {
}, },
selfLinker, selfLinker,
200, 200,
1, 2,
[]eventCheck{ []eventCheck{
noRequestBody(0), noRequestBody(0),
responseBodyMatches(0, `{.*"name":"c".*}`), responseBodyMatches(0, `{.*"name":"c".*}`),
@ -144,7 +144,7 @@ func TestAudit(t *testing.T) {
namespace: "other", namespace: "other",
}, },
200, 200,
1, 2,
[]eventCheck{ []eventCheck{
noRequestBody(0), noRequestBody(0),
responseBodyMatches(0, `{.*"name":"a".*"name":"b".*}`), responseBodyMatches(0, `{.*"name":"a".*"name":"b".*}`),
@ -157,7 +157,7 @@ func TestAudit(t *testing.T) {
}, },
selfLinker, selfLinker,
201, 201,
1, 2,
[]eventCheck{ []eventCheck{
requestBodyIs(0, string(simpleFooJSON)), requestBodyIs(0, string(simpleFooJSON)),
responseBodyMatches(0, `{.*"foo".*}`), responseBodyMatches(0, `{.*"foo".*}`),
@ -170,7 +170,7 @@ func TestAudit(t *testing.T) {
}, },
selfLinker, selfLinker,
405, 405,
1, 2,
[]eventCheck{ []eventCheck{
noRequestBody(0), // the 405 is thrown long before the create handler would be executed noRequestBody(0), // the 405 is thrown long before the create handler would be executed
noResponseBody(0), // the 405 is thrown long before the create handler would be executed noResponseBody(0), // the 405 is thrown long before the create handler would be executed
@ -183,7 +183,7 @@ func TestAudit(t *testing.T) {
}, },
selfLinker, selfLinker,
200, 200,
1, 2,
[]eventCheck{ []eventCheck{
noRequestBody(0), noRequestBody(0),
responseBodyMatches(0, `{.*"kind":"Status".*"status":"Success".*}`), responseBodyMatches(0, `{.*"kind":"Status".*"status":"Success".*}`),
@ -196,7 +196,7 @@ func TestAudit(t *testing.T) {
}, },
selfLinker, selfLinker,
200, 200,
1, 2,
[]eventCheck{ []eventCheck{
requestBodyMatches(0, "DeleteOptions"), requestBodyMatches(0, "DeleteOptions"),
responseBodyMatches(0, `{.*"kind":"Status".*"status":"Success".*}`), responseBodyMatches(0, `{.*"kind":"Status".*"status":"Success".*}`),
@ -209,7 +209,7 @@ func TestAudit(t *testing.T) {
}, },
selfLinker, selfLinker,
200, 200,
1, 2,
[]eventCheck{ []eventCheck{
requestBodyIs(0, string(simpleCPrimeJSON)), requestBodyIs(0, string(simpleCPrimeJSON)),
responseBodyMatches(0, `{.*"bla".*}`), responseBodyMatches(0, `{.*"bla".*}`),
@ -222,7 +222,7 @@ func TestAudit(t *testing.T) {
}, },
selfLinker, selfLinker,
400, 400,
1, 2,
[]eventCheck{ []eventCheck{
requestBodyIs(0, string(simpleCPrimeJSON)), requestBodyIs(0, string(simpleCPrimeJSON)),
responseBodyMatches(0, `"Status".*"status":"Failure".*"code":400}`), responseBodyMatches(0, `"Status".*"status":"Failure".*"code":400}`),
@ -242,7 +242,7 @@ func TestAudit(t *testing.T) {
namespace: "other", namespace: "other",
}, },
200, 200,
1, 2,
[]eventCheck{ []eventCheck{
requestBodyIs(0, `{"labels":{"foo":"bar"}}`), requestBodyIs(0, `{"labels":{"foo":"bar"}}`),
responseBodyMatches(0, `"name":"c".*"labels":{"foo":"bar"}`), responseBodyMatches(0, `"name":"c".*"labels":{"foo":"bar"}`),
@ -259,7 +259,7 @@ func TestAudit(t *testing.T) {
namespace: "other", namespace: "other",
}, },
200, 200,
2, 3,
[]eventCheck{ []eventCheck{
noRequestBody(0), noRequestBody(0),
noResponseBody(0), noResponseBody(0),

View File

@ -92,11 +92,10 @@ func WithAudit(handler http.Handler, requestContextMapper request.RequestContext
sink.ProcessEvents(ev) sink.ProcessEvents(ev)
// intercept the status code // intercept the status code
longRunning := false
var longRunningSink audit.Sink var longRunningSink audit.Sink
if longRunningCheck != nil { if longRunningCheck != nil {
ri, _ := request.RequestInfoFrom(ctx) ri, _ := request.RequestInfoFrom(ctx)
if longRunning = longRunningCheck(req, ri); longRunning { if longRunningCheck(req, ri) {
longRunningSink = sink longRunningSink = sink
} }
} }
@ -106,20 +105,34 @@ func WithAudit(handler http.Handler, requestContextMapper request.RequestContext
// running requests, this will be the second audit event. // running requests, this will be the second audit event.
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
defer panic(r)
ev.Stage = auditinternal.StagePanic
ev.ResponseStatus = &metav1.Status{ ev.ResponseStatus = &metav1.Status{
Code: http.StatusInternalServerError, Code: http.StatusInternalServerError,
Status: metav1.StatusFailure,
Reason: metav1.StatusReasonInternalError,
Message: fmt.Sprintf("APIServer panic'd: %v", r),
} }
sink.ProcessEvents(ev) sink.ProcessEvents(ev)
panic(r) return
} }
if ev.ResponseStatus == nil { // if no StageResponseStarted event was sent b/c neither a status code nor a body was sent, fake it here
ev.ResponseStatus = &metav1.Status{ fakedSuccessStatus := &metav1.Status{
Code: 200, Code: http.StatusOK,
} Status: metav1.StatusSuccess,
Message: "Connection closed early",
}
if ev.ResponseStatus == nil && longRunningSink != nil {
ev.ResponseStatus = fakedSuccessStatus
ev.Stage = auditinternal.StageResponseStarted
longRunningSink.ProcessEvents(ev)
} }
ev.Stage = auditinternal.StageResponseComplete ev.Stage = auditinternal.StageResponseComplete
if ev.ResponseStatus == nil {
ev.ResponseStatus = fakedSuccessStatus
}
sink.ProcessEvents(ev) sink.ProcessEvents(ev)
}() }()
handler.ServeHTTP(respWriter, req) handler.ServeHTTP(respWriter, req)

View File

@ -19,6 +19,7 @@ package filters
import ( import (
"bufio" "bufio"
"bytes" "bytes"
"fmt"
"net" "net"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
@ -177,10 +178,17 @@ func (*fakeHTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
} }
func TestAudit(t *testing.T) { func TestAudit(t *testing.T) {
shortRunningPrefix := `[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" ip="127.0.0.1" method="list" user="admin" groups="<none>" as="<self>" asgroups="<lookup>" namespace="default" uri="/api/v1/namespaces/default/pods"` writingShortRunningPrefix := func(stage string) string {
longRunningPrefix := `[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" ip="127.0.0.1" method="watch" user="admin" groups="<none>" as="<self>" asgroups="<lookup>" namespace="default" uri="/api/v1/namespaces/default/pods\?watch=true"` return fmt.Sprintf(`[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" stage="%s" ip="127.0.0.1" method="update" user="admin" groups="<none>" as="<self>" asgroups="<lookup>" namespace="default" uri="/api/v1/namespaces/default/pods/foo"`, stage)
}
readOnlyShortRunningPrefix := func(stage string) string {
return fmt.Sprintf(`[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" stage="%s" ip="127.0.0.1" method="get" user="admin" groups="<none>" as="<self>" asgroups="<lookup>" namespace="default" uri="/api/v1/namespaces/default/pods/foo"`, stage)
}
longRunningPrefix := func(stage string) string {
return fmt.Sprintf(`[\d\:\-\.\+TZ]+ AUDIT: id="[\w-]+" stage="%s" ip="127.0.0.1" method="watch" user="admin" groups="<none>" as="<self>" asgroups="<lookup>" namespace="default" uri="/api/v1/namespaces/default/pods\?watch=true"`, stage)
}
shortRunningPath := "/api/v1/namespaces/default/pods" shortRunningPath := "/api/v1/namespaces/default/pods/foo"
longRunningPath := "/api/v1/namespaces/default/pods?watch=true" longRunningPath := "/api/v1/namespaces/default/pods?watch=true"
delay := 500 * time.Millisecond delay := 500 * time.Millisecond
@ -188,58 +196,93 @@ func TestAudit(t *testing.T) {
for _, test := range []struct { for _, test := range []struct {
desc string desc string
path string path string
verb string
handler func(http.ResponseWriter, *http.Request) handler func(http.ResponseWriter, *http.Request)
expected []string expected []string
}{ }{
// short running requests // short running requests with read-only verb
{ {
"empty", "read-only empty",
shortRunningPath, shortRunningPath,
"GET",
func(http.ResponseWriter, *http.Request) {}, func(http.ResponseWriter, *http.Request) {},
[]string{ []string{
shortRunningPrefix + ` response="200"`, readOnlyShortRunningPrefix(auditinternal.StageRequestReceived) + ` response="<deferred>"`,
readOnlyShortRunningPrefix(auditinternal.StageResponseComplete) + ` response="200"`,
}, },
}, },
{ {
"sleep", "read-only panic",
shortRunningPath, shortRunningPath,
"GET",
func(w http.ResponseWriter, req *http.Request) {
panic("kaboom")
},
[]string{
readOnlyShortRunningPrefix(auditinternal.StageRequestReceived) + ` response="<deferred>"`,
readOnlyShortRunningPrefix(auditinternal.StagePanic) + ` response="500"`,
},
},
// short running request with non-read-only verb
{
"writing empty",
shortRunningPath,
"PUT",
func(http.ResponseWriter, *http.Request) {},
[]string{
writingShortRunningPrefix(auditinternal.StageRequestReceived) + ` response="<deferred>"`,
writingShortRunningPrefix(auditinternal.StageResponseComplete) + ` response="200"`,
},
},
{
"writing sleep",
shortRunningPath,
"PUT",
func(http.ResponseWriter, *http.Request) { func(http.ResponseWriter, *http.Request) {
time.Sleep(delay) time.Sleep(delay)
}, },
[]string{ []string{
shortRunningPrefix + ` response="200"`, writingShortRunningPrefix(auditinternal.StageRequestReceived) + ` response="<deferred>"`,
writingShortRunningPrefix(auditinternal.StageResponseComplete) + ` response="200"`,
}, },
}, },
{ {
"403+write", "writing 403+write",
shortRunningPath, shortRunningPath,
"PUT",
func(w http.ResponseWriter, req *http.Request) { func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(403) w.WriteHeader(403)
w.Write([]byte("foo")) w.Write([]byte("foo"))
}, },
[]string{ []string{
shortRunningPrefix + ` response="403"`, writingShortRunningPrefix(auditinternal.StageRequestReceived) + ` response="<deferred>"`,
writingShortRunningPrefix(auditinternal.StageResponseComplete) + ` response="403"`,
}, },
}, },
{ {
"panic", "writing panic",
shortRunningPath, shortRunningPath,
"PUT",
func(w http.ResponseWriter, req *http.Request) { func(w http.ResponseWriter, req *http.Request) {
panic("kaboom") panic("kaboom")
}, },
[]string{ []string{
shortRunningPrefix + ` response="500"`, writingShortRunningPrefix(auditinternal.StageRequestReceived) + ` response="<deferred>"`,
writingShortRunningPrefix(auditinternal.StagePanic) + ` response="500"`,
}, },
}, },
{ {
"write+panic", "writing write+panic",
shortRunningPath, shortRunningPath,
"PUT",
func(w http.ResponseWriter, req *http.Request) { func(w http.ResponseWriter, req *http.Request) {
w.Write([]byte("foo")) w.Write([]byte("foo"))
panic("kaboom") panic("kaboom")
}, },
[]string{ []string{
shortRunningPrefix + ` response="500"`, writingShortRunningPrefix(auditinternal.StageRequestReceived) + ` response="<deferred>"`,
writingShortRunningPrefix(auditinternal.StagePanic) + ` response="500"`,
}, },
}, },
@ -247,76 +290,92 @@ func TestAudit(t *testing.T) {
{ {
"empty longrunning", "empty longrunning",
longRunningPath, longRunningPath,
"GET",
func(http.ResponseWriter, *http.Request) {}, func(http.ResponseWriter, *http.Request) {},
[]string{ []string{
longRunningPrefix + ` response="200"`, longRunningPrefix(auditinternal.StageRequestReceived) + ` response="<deferred>"`,
longRunningPrefix(auditinternal.StageResponseStarted) + ` response="200"`,
longRunningPrefix(auditinternal.StageResponseComplete) + ` response="200"`,
}, },
}, },
{ {
"sleep longrunning", "sleep longrunning",
longRunningPath, longRunningPath,
"GET",
func(http.ResponseWriter, *http.Request) { func(http.ResponseWriter, *http.Request) {
time.Sleep(delay) time.Sleep(delay)
}, },
[]string{ []string{
longRunningPrefix + ` response="200"`, longRunningPrefix(auditinternal.StageRequestReceived) + ` response="<deferred>"`,
longRunningPrefix(auditinternal.StageResponseStarted) + ` response="200"`,
longRunningPrefix(auditinternal.StageResponseComplete) + ` response="200"`,
}, },
}, },
{ {
"sleep+403 longrunning", "sleep+403 longrunning",
longRunningPath, longRunningPath,
"GET",
func(w http.ResponseWriter, req *http.Request) { func(w http.ResponseWriter, req *http.Request) {
time.Sleep(delay) time.Sleep(delay)
w.WriteHeader(403) w.WriteHeader(403)
}, },
[]string{ []string{
longRunningPrefix + ` response="<deferred>"`, longRunningPrefix(auditinternal.StageRequestReceived) + ` response="<deferred>"`,
longRunningPrefix + ` response="403"`, longRunningPrefix(auditinternal.StageResponseStarted) + ` response="403"`,
longRunningPrefix(auditinternal.StageResponseComplete) + ` response="403"`,
}, },
}, },
{ {
"write longrunning", "write longrunning",
longRunningPath, longRunningPath,
"GET",
func(w http.ResponseWriter, req *http.Request) { func(w http.ResponseWriter, req *http.Request) {
w.Write([]byte("foo")) w.Write([]byte("foo"))
}, },
[]string{ []string{
longRunningPrefix + ` response="<deferred>"`, longRunningPrefix(auditinternal.StageRequestReceived) + ` response="<deferred>"`,
longRunningPrefix + ` response="200"`, longRunningPrefix(auditinternal.StageResponseStarted) + ` response="200"`,
longRunningPrefix(auditinternal.StageResponseComplete) + ` response="200"`,
}, },
}, },
{ {
"403+write longrunning", "403+write longrunning",
longRunningPath, longRunningPath,
"GET",
func(w http.ResponseWriter, req *http.Request) { func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(403) w.WriteHeader(403)
w.Write([]byte("foo")) w.Write([]byte("foo"))
}, },
[]string{ []string{
longRunningPrefix + ` response="<deferred>"`, longRunningPrefix(auditinternal.StageRequestReceived) + ` response="<deferred>"`,
longRunningPrefix + ` response="403"`, longRunningPrefix(auditinternal.StageResponseStarted) + ` response="403"`,
longRunningPrefix(auditinternal.StageResponseComplete) + ` response="403"`,
}, },
}, },
{ {
"panic longrunning", "panic longrunning",
longRunningPath, longRunningPath,
"GET",
func(w http.ResponseWriter, req *http.Request) { func(w http.ResponseWriter, req *http.Request) {
panic("kaboom") panic("kaboom")
}, },
[]string{ []string{
longRunningPrefix + ` response="500"`, longRunningPrefix(auditinternal.StageRequestReceived) + ` response="<deferred>"`,
longRunningPrefix(auditinternal.StagePanic) + ` response="500"`,
}, },
}, },
{ {
"write+panic longrunning", "write+panic longrunning",
longRunningPath, longRunningPath,
"GET",
func(w http.ResponseWriter, req *http.Request) { func(w http.ResponseWriter, req *http.Request) {
w.Write([]byte("foo")) w.Write([]byte("foo"))
panic("kaboom") panic("kaboom")
}, },
[]string{ []string{
longRunningPrefix + ` response="<deferred>"`, longRunningPrefix(auditinternal.StageRequestReceived) + ` response="<deferred>"`,
longRunningPrefix + ` response="500"`, longRunningPrefix(auditinternal.StageResponseStarted) + ` response="200"`,
longRunningPrefix(auditinternal.StagePanic) + ` response="500"`,
}, },
}, },
} { } {
@ -330,7 +389,7 @@ func TestAudit(t *testing.T) {
return ri.Verb == "watch" return ri.Verb == "watch"
}) })
req, _ := http.NewRequest("GET", test.path, nil) req, _ := http.NewRequest(test.verb, test.path, nil)
req.RemoteAddr = "127.0.0.1" req.RemoteAddr = "127.0.0.1"
func() { func() {

View File

@ -82,8 +82,8 @@ func (b *backend) logEvent(ev *auditinternal.Event) {
ip = ev.SourceIPs[0] ip = ev.SourceIPs[0]
} }
line := fmt.Sprintf("%s AUDIT: id=%q ip=%q method=%q user=%q groups=%q as=%q asgroups=%q namespace=%q uri=%q response=\"%s\"\n", line := fmt.Sprintf("%s AUDIT: id=%q stage=%q ip=%q method=%q user=%q groups=%q as=%q asgroups=%q namespace=%q uri=%q response=\"%s\"\n",
ev.Timestamp.Format(time.RFC3339Nano), ev.AuditID, ip, ev.Verb, username, groups, asuser, asgroups, namespace, ev.RequestURI, response) ev.Timestamp.Format(time.RFC3339Nano), ev.AuditID, ev.Stage, ip, ev.Verb, username, groups, asuser, asgroups, namespace, ev.RequestURI, response)
if _, err := fmt.Fprint(b.out, line); err != nil { if _, err := fmt.Fprint(b.out, line); err != nil {
glog.Errorf("Unable to write audit log: %s, the error is: %v", line, err) glog.Errorf("Unable to write audit log: %s, the error is: %v", line, err)
} }