add otel tracing to latency filters

Kubernetes-commit: ed1610ad15f91b72017c5d69dc4f7d59a17c270f
This commit is contained in:
David Ashpole 2022-10-20 16:17:02 +00:00 committed by Kubernetes Publisher
parent 352315aae1
commit 0cf3af5b9f
4 changed files with 58 additions and 10 deletions

View File

@ -22,6 +22,8 @@ import (
"net/http"
"time"
"go.opentelemetry.io/otel/trace"
"k8s.io/apiserver/pkg/endpoints/metrics"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/server/httplog"
@ -54,8 +56,8 @@ func requestFilterRecordFrom(ctx context.Context) *requestFilterRecord {
// TrackStarted measures the timestamp the given handler has started execution
// by attaching a handler to the chain.
func TrackStarted(handler http.Handler, name string) http.Handler {
return trackStarted(handler, name, clock.RealClock{})
func TrackStarted(handler http.Handler, tp trace.TracerProvider, name string) http.Handler {
return trackStarted(handler, tp, name, clock.RealClock{})
}
// TrackCompleted measures the timestamp the given handler has completed execution and then
@ -70,7 +72,9 @@ func TrackCompleted(handler http.Handler) http.Handler {
})
}
func trackStarted(handler http.Handler, name string, clock clock.PassiveClock) http.Handler {
func trackStarted(handler http.Handler, tp trace.TracerProvider, name string, clock clock.PassiveClock) http.Handler {
// This is a noop if the tracing is disabled, since tp will be a NoopTracerProvider
tracer := tp.Tracer("k8s.op/apiserver/pkg/endpoints/filterlatency")
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
if fr := requestFilterRecordFrom(ctx); fr != nil {
@ -85,6 +89,7 @@ func trackStarted(handler http.Handler, name string, clock clock.PassiveClock) h
name: name,
startedTimestamp: clock.Now(),
}
ctx, _ = tracer.Start(ctx, name)
r = r.WithContext(withRequestFilterRecord(ctx, fr))
handler.ServeHTTP(w, r)
})
@ -101,5 +106,6 @@ func trackCompleted(handler http.Handler, clock clock.PassiveClock, action func(
if fr := requestFilterRecordFrom(ctx); fr != nil {
action(ctx, fr, completedAt)
}
trace.SpanFromContext(ctx).End()
})
}

View File

@ -23,6 +23,10 @@ import (
"testing"
"time"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.opentelemetry.io/otel/trace"
testingclock "k8s.io/utils/clock/testing"
)
@ -41,7 +45,7 @@ func TestTrackStartedWithContextAlreadyHasFilterRecord(t *testing.T) {
})
requestFilterStarted := time.Now()
wrapped := trackStarted(handler, filterName, testingclock.NewFakeClock(requestFilterStarted))
wrapped := trackStarted(handler, trace.NewNoopTracerProvider(), filterName, testingclock.NewFakeClock(requestFilterStarted))
testRequest, err := http.NewRequest(http.MethodGet, "/api/v1/namespaces", nil)
if err != nil {
@ -84,7 +88,7 @@ func TestTrackStartedWithContextDoesNotHaveFilterRecord(t *testing.T) {
})
requestFilterStarted := time.Now()
wrapped := trackStarted(handler, filterName, testingclock.NewFakeClock(requestFilterStarted))
wrapped := trackStarted(handler, trace.NewNoopTracerProvider(), filterName, testingclock.NewFakeClock(requestFilterStarted))
testRequest, err := http.NewRequest(http.MethodGet, "/api/v1/namespaces", nil)
if err != nil {
@ -176,3 +180,39 @@ func TestTrackCompletedContextDoesNotHaveFilterRecord(t *testing.T) {
t.Errorf("expected the callback to not be invoked, but was actually invoked %d times", actionCallCount)
}
}
func TestStartedAndCompletedOpenTelemetryTracing(t *testing.T) {
filterName := "my-filter"
// Seup OTel for testing
fakeRecorder := tracetest.NewSpanRecorder()
tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(fakeRecorder))
// base handler func
var callCount int
handler := http.HandlerFunc(func(_ http.ResponseWriter, req *http.Request) {
// we expect the handler to be invoked just once.
callCount++
})
// wrap with start and completed handler
wrapped := TrackCompleted(handler)
wrapped = TrackStarted(wrapped, tp, filterName)
testRequest, err := http.NewRequest(http.MethodGet, "/api/v1/namespaces", nil)
if err != nil {
t.Fatalf("failed to create new http request - %v", err)
}
wrapped.ServeHTTP(httptest.NewRecorder(), testRequest)
if callCount != 1 {
t.Errorf("expected the given handler to be invoked once, but was actually invoked %d times", callCount)
}
output := fakeRecorder.Ended()
if len(output) != 1 {
t.Fatalf("got %d; expected len(output) == 1", len(output))
}
span := output[0]
if span.Name() != filterName {
t.Fatalf("got %s; expected span.Name == my-filter", span.Name())
}
}

View File

@ -828,7 +828,7 @@ func BuildHandlerChainWithStorageVersionPrecondition(apiHandler http.Handler, c
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler := filterlatency.TrackCompleted(apiHandler)
handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer)
handler = filterlatency.TrackStarted(handler, "authorization")
handler = filterlatency.TrackStarted(handler, c.TracerProvider, "authorization")
if c.FlowControl != nil {
workEstimatorCfg := flowcontrolrequest.DefaultWorkEstimatorConfig()
@ -836,18 +836,18 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount, workEstimatorCfg)
handler = filterlatency.TrackCompleted(handler)
handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator)
handler = filterlatency.TrackStarted(handler, "priorityandfairness")
handler = filterlatency.TrackStarted(handler, c.TracerProvider, "priorityandfairness")
} else {
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
}
handler = filterlatency.TrackCompleted(handler)
handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
handler = filterlatency.TrackStarted(handler, "impersonation")
handler = filterlatency.TrackStarted(handler, c.TracerProvider, "impersonation")
handler = filterlatency.TrackCompleted(handler)
handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator, c.LongRunningFunc)
handler = filterlatency.TrackStarted(handler, "audit")
handler = filterlatency.TrackStarted(handler, c.TracerProvider, "audit")
failedHandler := genericapifilters.Unauthorized(c.Serializer)
failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyRuleEvaluator)
@ -855,7 +855,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
failedHandler = filterlatency.TrackCompleted(failedHandler)
handler = filterlatency.TrackCompleted(handler)
handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
handler = filterlatency.TrackStarted(handler, "authentication")
handler = filterlatency.TrackStarted(handler, c.TracerProvider, "authentication")
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")

View File

@ -40,6 +40,7 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
"k8s.io/component-base/tracing"
netutils "k8s.io/utils/net"
)
@ -302,6 +303,7 @@ func TestAuthenticationAuditAnnotationsDefaultChain(t *testing.T) {
RequestTimeout: 10 * time.Second,
LongRunningFunc: func(_ *http.Request, _ *request.RequestInfo) bool { return false },
lifecycleSignals: newLifecycleSignals(),
TracerProvider: tracing.NewNoopTracerProvider(),
}
h := DefaultBuildHandlerChain(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {