Fix httplog not logging watch duration in separate goroutines

Signed-off-by: Eric Lin <exlin@google.com>

Kubernetes-commit: 06c7058115e623126884d05c54a30db511a9cb71
This commit is contained in:
Eric Lin 2024-06-21 10:03:31 +00:00 committed by Kubernetes Publisher
parent 19c13772cd
commit 5d14d72b5c
6 changed files with 139 additions and 27 deletions

View File

@ -41,7 +41,7 @@ import (
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/registry/rest"
genericfilters "k8s.io/apiserver/pkg/server/filters"
"k8s.io/apiserver/pkg/server/routine"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/tracing"
"k8s.io/klog/v2"
@ -285,7 +285,7 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc
}
// Run watch serving in a separate goroutine to allow freeing current stack memory
t := genericfilters.TaskFrom(req.Context())
t := routine.TaskFrom(req.Context())
if t != nil {
t.Func = serve
} else {

View File

@ -64,6 +64,7 @@ import (
genericfilters "k8s.io/apiserver/pkg/server/filters"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/apiserver/pkg/server/routes"
"k8s.io/apiserver/pkg/server/routine"
serverstore "k8s.io/apiserver/pkg/server/storage"
storagevalue "k8s.io/apiserver/pkg/storage/value"
"k8s.io/apiserver/pkg/storageversion"
@ -1061,7 +1062,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
// handler in current goroutine to minimize the stack memory usage. It must be
// after WithPanicRecover() to be protected from panics.
if c.FeatureGate.Enabled(genericfeatures.APIServingWithRoutine) {
handler = genericfilters.WithRoutine(handler, c.LongRunningFunc)
handler = routine.WithRoutine(handler, c.LongRunningFunc)
}
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
handler = genericapifilters.WithRequestReceivedTimestamp(handler)

View File

@ -1,5 +1,5 @@
/*
Copyright 2023 The Kubernetes Authors.
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -25,6 +25,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/server/routine"
"k8s.io/klog/v2"
)
@ -42,7 +43,7 @@ func TestPropogatingPanic(t *testing.T) {
APIPrefixes: sets.NewString("api", "apis"),
GrouplessAPIPrefixes: sets.NewString("api"),
}
ts := httptest.NewServer(WithRoutine(WithPanicRecovery(handler, resolver), func(_ *http.Request, _ *request.RequestInfo) bool { return true }))
ts := httptest.NewServer(routine.WithRoutine(WithPanicRecovery(handler, resolver), func(_ *http.Request, _ *request.RequestInfo) bool { return true }))
defer ts.Close()
_, err := http.Get(ts.URL)
if err == nil {
@ -57,23 +58,3 @@ func TestPropogatingPanic(t *testing.T) {
t.Errorf("unexpected out captured actual = %v", capturedOutput)
}
}
func TestExecutionWithRoutine(t *testing.T) {
var executed bool
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
t := TaskFrom(r.Context())
t.Func = func() {
executed = true
}
})
ts := httptest.NewServer(WithRoutine(handler, func(_ *http.Request, _ *request.RequestInfo) bool { return true }))
defer ts.Close()
_, err := http.Get(ts.URL)
if err != nil {
t.Errorf("got unexpected error on request: %v", err)
}
if !executed {
t.Error("expected to execute")
}
}

View File

@ -31,6 +31,7 @@ import (
"k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/endpoints/responsewriter"
"k8s.io/apiserver/pkg/server/routine"
"k8s.io/klog/v2"
)
@ -125,10 +126,26 @@ func withLogging(handler http.Handler, stackTracePred StacktracePred, shouldLogR
rl := newLoggedWithStartTime(req, w, startTime)
rl.StacktraceWhen(stackTracePred)
req = req.WithContext(context.WithValue(ctx, respLoggerContextKey, rl))
defer rl.Log()
var logFunc func()
logFunc = rl.Log
defer func() {
if logFunc != nil {
logFunc()
}
}()
w = responsewriter.WrapForHTTP1Or2(rl)
handler.ServeHTTP(w, req)
// We need to ensure that the request is logged after it is processed.
// In case the request is executed in a separate goroutine created via
// WithRoutine handler in the handler chain (i.e. above handler.ServeHTTP()
// would return request is completely responsed), we want the logging to
// happen in that goroutine too, so we append it to the task.
if routine.AppendTask(ctx, &routine.Task{Func: rl.Log}) {
logFunc = nil
}
})
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package filters
package routine
import (
"context"
@ -35,6 +35,20 @@ func WithTask(parent context.Context, t *Task) context.Context {
return request.WithValue(parent, taskKey, t)
}
// AppendTask appends a task executed after completion of existing task.
// It is a no-op if there is no existing task.
func AppendTask(ctx context.Context, t *Task) bool {
if existTask := TaskFrom(ctx); existTask != nil && existTask.Func != nil {
existFunc := existTask.Func
existTask.Func = func() {
existFunc()
t.Func()
}
return true
}
return false
}
func TaskFrom(ctx context.Context) *Task {
t, _ := ctx.Value(taskKey).(*Task)
return t

View File

@ -0,0 +1,99 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package routine
import (
"net/http"
"net/http/httptest"
"testing"
"k8s.io/apiserver/pkg/endpoints/request"
)
func TestExecutionWithRoutine(t *testing.T) {
var executed bool
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
t := TaskFrom(r.Context())
t.Func = func() {
executed = true
}
})
ts := httptest.NewServer(WithRoutine(handler, func(_ *http.Request, _ *request.RequestInfo) bool { return true }))
defer ts.Close()
_, err := http.Get(ts.URL)
if err != nil {
t.Errorf("got unexpected error on request: %v", err)
}
if !executed {
t.Error("expected to execute")
}
}
func TestAppendTask(t *testing.T) {
tests := []struct {
name string
existingTask bool
taskAppended bool
shouldExecute bool
}{
{
name: "append task when existing",
existingTask: true,
taskAppended: true,
shouldExecute: true,
},
{
name: "not append task when no existing tasks",
existingTask: false,
taskAppended: false,
shouldExecute: false,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var executed, appended bool
taskToAppend := func() {
executed = true
}
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
if test.existingTask {
t := TaskFrom(ctx)
t.Func = func() {}
}
appended = AppendTask(ctx, &Task{taskToAppend})
})
ts := httptest.NewServer(WithRoutine(handler, func(_ *http.Request, _ *request.RequestInfo) bool { return true }))
defer ts.Close()
_, err := http.Get(ts.URL)
if err != nil {
t.Errorf("got unexpected error on request: %v", err)
}
if test.taskAppended != appended {
t.Errorf("expected taskAppended: %t, got: %t", test.taskAppended, executed)
}
if test.shouldExecute != executed {
t.Errorf("expected shouldExecute: %t, got: %t", test.shouldExecute, executed)
}
})
}
}