add context to metric in apiserver/audit

Kubernetes-commit: 4ba3f1a982227a30b083f6359e76a616e9eabfd1
This commit is contained in:
yoyinzyc 2021-01-20 12:04:41 -08:00 committed by Kubernetes Publisher
parent abc2bd5cb2
commit 6f3753addf
4 changed files with 26 additions and 21 deletions

View File

@ -17,6 +17,7 @@ limitations under the License.
package audit
import (
"context"
"fmt"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
@ -84,13 +85,13 @@ func init() {
}
// ObserveEvent updates the relevant prometheus metrics for the generated audit event.
func ObserveEvent() {
eventCounter.Inc()
func ObserveEvent(ctx context.Context) {
eventCounter.WithContext(ctx).Inc()
}
// ObservePolicyLevel updates the relevant prometheus metrics with the audit level for a request.
func ObservePolicyLevel(level auditinternal.Level) {
levelCounter.WithLabelValues(string(level)).Inc()
func ObservePolicyLevel(ctx context.Context, level auditinternal.Level) {
levelCounter.WithContext(ctx).WithLabelValues(string(level)).Inc()
}
// HandlePluginError handles an error that occurred in an audit plugin. This method should only be

View File

@ -18,6 +18,7 @@ package filters
import (
"bufio"
"context"
"errors"
"fmt"
"net"
@ -56,8 +57,8 @@ func WithAudit(handler http.Handler, sink audit.Sink, policy policy.Checker, lon
}
ev.Stage = auditinternal.StageRequestReceived
if processed := processAuditEvent(sink, ev, omitStages); !processed {
audit.ApiserverAuditDroppedCounter.Inc()
if processed := processAuditEvent(ctx, sink, ev, omitStages); !processed {
audit.ApiserverAuditDroppedCounter.WithContext(ctx).Inc()
responsewriters.InternalError(w, req, errors.New("failed to store audit event"))
return
}
@ -70,7 +71,7 @@ func WithAudit(handler http.Handler, sink audit.Sink, policy policy.Checker, lon
longRunningSink = sink
}
}
respWriter := decorateResponseWriter(w, ev, longRunningSink, omitStages)
respWriter := decorateResponseWriter(ctx, w, ev, longRunningSink, omitStages)
// send audit event when we leave this func, either via a panic or cleanly. In the case of long
// running requests, this will be the second audit event.
@ -84,7 +85,7 @@ func WithAudit(handler http.Handler, sink audit.Sink, policy policy.Checker, lon
Reason: metav1.StatusReasonInternalError,
Message: fmt.Sprintf("APIServer panic'd: %v", r),
}
processAuditEvent(sink, ev, omitStages)
processAuditEvent(ctx, sink, ev, omitStages)
return
}
@ -98,14 +99,14 @@ func WithAudit(handler http.Handler, sink audit.Sink, policy policy.Checker, lon
if ev.ResponseStatus == nil && longRunningSink != nil {
ev.ResponseStatus = fakedSuccessStatus
ev.Stage = auditinternal.StageResponseStarted
processAuditEvent(longRunningSink, ev, omitStages)
processAuditEvent(ctx, longRunningSink, ev, omitStages)
}
ev.Stage = auditinternal.StageResponseComplete
if ev.ResponseStatus == nil {
ev.ResponseStatus = fakedSuccessStatus
}
processAuditEvent(sink, ev, omitStages)
processAuditEvent(ctx, sink, ev, omitStages)
}()
handler.ServeHTTP(respWriter, req)
})
@ -125,7 +126,7 @@ func createAuditEventAndAttachToContext(req *http.Request, policy policy.Checker
}
level, omitStages := policy.LevelAndStages(attribs)
audit.ObservePolicyLevel(level)
audit.ObservePolicyLevel(ctx, level)
if level == auditinternal.LevelNone {
// Don't audit.
return req, nil, nil, nil
@ -145,7 +146,7 @@ func createAuditEventAndAttachToContext(req *http.Request, policy policy.Checker
return req, ev, omitStages, nil
}
func processAuditEvent(sink audit.Sink, ev *auditinternal.Event, omitStages []auditinternal.Stage) bool {
func processAuditEvent(ctx context.Context, sink audit.Sink, ev *auditinternal.Event, omitStages []auditinternal.Stage) bool {
for _, stage := range omitStages {
if ev.Stage == stage {
return true
@ -157,12 +158,13 @@ func processAuditEvent(sink audit.Sink, ev *auditinternal.Event, omitStages []au
} else {
ev.StageTimestamp = metav1.NewMicroTime(time.Now())
}
audit.ObserveEvent()
audit.ObserveEvent(ctx)
return sink.ProcessEvents(ev)
}
func decorateResponseWriter(responseWriter http.ResponseWriter, ev *auditinternal.Event, sink audit.Sink, omitStages []auditinternal.Stage) http.ResponseWriter {
func decorateResponseWriter(ctx context.Context, responseWriter http.ResponseWriter, ev *auditinternal.Event, sink audit.Sink, omitStages []auditinternal.Stage) http.ResponseWriter {
delegate := &auditResponseWriter{
ctx: ctx,
ResponseWriter: responseWriter,
event: ev,
sink: sink,
@ -186,6 +188,7 @@ var _ http.ResponseWriter = &auditResponseWriter{}
// create immediately an event (for long running requests).
type auditResponseWriter struct {
http.ResponseWriter
ctx context.Context
event *auditinternal.Event
once sync.Once
sink audit.Sink
@ -205,7 +208,7 @@ func (a *auditResponseWriter) processCode(code int) {
a.event.Stage = auditinternal.StageResponseStarted
if a.sink != nil {
processAuditEvent(a.sink, a.event, a.omitStages)
processAuditEvent(a.ctx, a.sink, a.event, a.omitStages)
}
})
}

View File

@ -18,6 +18,7 @@ package filters
import (
"bufio"
"context"
"net"
"net/http"
"net/http/httptest"
@ -92,14 +93,14 @@ func (*fancyResponseWriter) Flush() {}
func (*fancyResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { return nil, nil, nil }
func TestConstructResponseWriter(t *testing.T) {
actual := decorateResponseWriter(&simpleResponseWriter{}, nil, nil, nil)
actual := decorateResponseWriter(context.TODO(), &simpleResponseWriter{}, nil, nil, nil)
switch v := actual.(type) {
case *auditResponseWriter:
default:
t.Errorf("Expected auditResponseWriter, got %v", reflect.TypeOf(v))
}
actual = decorateResponseWriter(&fancyResponseWriter{}, nil, nil, nil)
actual = decorateResponseWriter(context.TODO(), &fancyResponseWriter{}, nil, nil, nil)
switch v := actual.(type) {
case *fancyResponseWriterDelegator:
default:
@ -109,7 +110,7 @@ func TestConstructResponseWriter(t *testing.T) {
func TestDecorateResponseWriterWithoutChannel(t *testing.T) {
ev := &auditinternal.Event{}
actual := decorateResponseWriter(&simpleResponseWriter{}, ev, nil, nil)
actual := decorateResponseWriter(context.TODO(), &simpleResponseWriter{}, ev, nil, nil)
// write status. This will not block because firstEventSentCh is nil
actual.WriteHeader(42)
@ -123,7 +124,7 @@ func TestDecorateResponseWriterWithoutChannel(t *testing.T) {
func TestDecorateResponseWriterWithImplicitWrite(t *testing.T) {
ev := &auditinternal.Event{}
actual := decorateResponseWriter(&simpleResponseWriter{}, ev, nil, nil)
actual := decorateResponseWriter(context.TODO(), &simpleResponseWriter{}, ev, nil, nil)
// write status. This will not block because firstEventSentCh is nil
actual.Write([]byte("foo"))
@ -138,7 +139,7 @@ func TestDecorateResponseWriterWithImplicitWrite(t *testing.T) {
func TestDecorateResponseWriterChannel(t *testing.T) {
sink := &fakeAuditSink{}
ev := &auditinternal.Event{}
actual := decorateResponseWriter(&simpleResponseWriter{}, ev, sink, nil)
actual := decorateResponseWriter(context.TODO(), &simpleResponseWriter{}, ev, sink, nil)
done := make(chan struct{})
go func() {

View File

@ -52,7 +52,7 @@ func WithFailedAuthenticationAudit(failedHandler http.Handler, sink audit.Sink,
ev.ResponseStatus.Message = getAuthMethods(req)
ev.Stage = auditinternal.StageResponseStarted
rw := decorateResponseWriter(w, ev, sink, omitStages)
rw := decorateResponseWriter(req.Context(), w, ev, sink, omitStages)
failedHandler.ServeHTTP(rw, req)
})
}