Merge pull request #127029 from tkashem/apf-fix-watch-panic-handling
apf: request handler must wait for watch init goroutine to return Kubernetes-commit: 8bc073a5fe7415099f936f22ff92c96391b9c0c8
This commit is contained in:
commit
b16c96d150
4
go.mod
4
go.mod
|
@ -50,8 +50,8 @@ require (
|
|||
gopkg.in/natefinch/lumberjack.v2 v2.2.1
|
||||
gopkg.in/square/go-jose.v2 v2.6.0
|
||||
k8s.io/api v0.0.0-20240920202009-71385f038c10
|
||||
k8s.io/apimachinery v0.0.0-20240926041705-dc03077c038e
|
||||
k8s.io/client-go v0.0.0-20240928202509-ea4f3d0f6f99
|
||||
k8s.io/apimachinery v0.0.0-20240929035808-0db5dbf03048
|
||||
k8s.io/client-go v0.0.0-20240929082523-46093399c4de
|
||||
k8s.io/component-base v0.0.0-20240928083227-66de10e147bc
|
||||
k8s.io/klog/v2 v2.130.1
|
||||
k8s.io/kms v0.0.0-20240912041232-273c893e4e51
|
||||
|
|
8
go.sum
8
go.sum
|
@ -371,10 +371,10 @@ honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWh
|
|||
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
k8s.io/api v0.0.0-20240920202009-71385f038c10 h1:shjQe98Co9zBlDzQkxb5IJEWtReSl7qunr56C4Jgc70=
|
||||
k8s.io/api v0.0.0-20240920202009-71385f038c10/go.mod h1:KCEt6+W/Yn1Vc48pYXeLf0mGK52kJhvt+rcaUVsIaKQ=
|
||||
k8s.io/apimachinery v0.0.0-20240926041705-dc03077c038e h1:/N2qUkIcQ8VaZbmy28grdhVBbNXO6KLZRdz6F2siNeY=
|
||||
k8s.io/apimachinery v0.0.0-20240926041705-dc03077c038e/go.mod h1:5rKPDwwN9qm//xASFCZ83nyYEanHxxhc7pZ8AC4lukY=
|
||||
k8s.io/client-go v0.0.0-20240928202509-ea4f3d0f6f99 h1:67SwvL0CAMQPF0yCTrnLWxQFvLMcZPwR06YjmjMeYzE=
|
||||
k8s.io/client-go v0.0.0-20240928202509-ea4f3d0f6f99/go.mod h1:+LjOIsTEpEsaW4dpOAa5PueCf6hTShgzbB0+c+KT4K0=
|
||||
k8s.io/apimachinery v0.0.0-20240929035808-0db5dbf03048 h1:EWQWppfphUSBwuhuNA4weJ9vJtfHhjVwizTYjZb8ikw=
|
||||
k8s.io/apimachinery v0.0.0-20240929035808-0db5dbf03048/go.mod h1:5rKPDwwN9qm//xASFCZ83nyYEanHxxhc7pZ8AC4lukY=
|
||||
k8s.io/client-go v0.0.0-20240929082523-46093399c4de h1:a/AoWBaxo6kjvY29p3PUxc7PDUfyTHQCBb3zoA7L7gw=
|
||||
k8s.io/client-go v0.0.0-20240929082523-46093399c4de/go.mod h1:rV8uxwGX3sOvz0lP3/B/bLL2+dtQ7QmA4iU+d7u8snk=
|
||||
k8s.io/component-base v0.0.0-20240928083227-66de10e147bc h1:rcpUSxWdWOVlC4auITDsLVD8mkDwKoO5ngGUEW1GARg=
|
||||
k8s.io/component-base v0.0.0-20240928083227-66de10e147bc/go.mod h1:y3+nKeFC4lXl3nEHGAXUtoRopMfxd7um7GOQzZWcdi8=
|
||||
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=
|
||||
|
|
|
@ -266,17 +266,23 @@ func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Reque
|
|||
|
||||
select {
|
||||
case <-shouldStartWatchCh:
|
||||
watchCtx := utilflowcontrol.WithInitializationSignal(ctx, watchInitializationSignal)
|
||||
watchReq = r.WithContext(watchCtx)
|
||||
h.handler.ServeHTTP(w, watchReq)
|
||||
// Protect from the situation when request will not reach storage layer
|
||||
// and the initialization signal will not be send.
|
||||
// It has to happen before waiting on the resultCh below.
|
||||
watchInitializationSignal.Signal()
|
||||
// TODO: Consider finishing the request as soon as Handle call panics.
|
||||
if err := <-resultCh; err != nil {
|
||||
panic(err)
|
||||
}
|
||||
func() {
|
||||
// TODO: if both goroutines panic, propagate the stack traces from both
|
||||
// goroutines so they are logged properly:
|
||||
defer func() {
|
||||
// Protect from the situation when request will not reach storage layer
|
||||
// and the initialization signal will not be send.
|
||||
// It has to happen before waiting on the resultCh below.
|
||||
watchInitializationSignal.Signal()
|
||||
// TODO: Consider finishing the request as soon as Handle call panics.
|
||||
if err := <-resultCh; err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
watchCtx := utilflowcontrol.WithInitializationSignal(ctx, watchInitializationSignal)
|
||||
watchReq = r.WithContext(watchCtx)
|
||||
h.handler.ServeHTTP(w, watchReq)
|
||||
}()
|
||||
case err := <-resultCh:
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
|
|
@ -177,11 +177,21 @@ func newApfHandlerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Int
|
|||
r = r.WithContext(apirequest.WithUser(r.Context(), &user.DefaultInfo{
|
||||
Groups: []string{user.AllUnauthenticated},
|
||||
}))
|
||||
apfHandler.ServeHTTP(w, r)
|
||||
postExecute()
|
||||
if atomicReadOnlyExecuting != 0 {
|
||||
t.Errorf("Wanted %d requests executing, got %d", 0, atomicReadOnlyExecuting)
|
||||
}
|
||||
func() {
|
||||
// the APF handler completes its run, either normally or
|
||||
// with a panic, in either case, all APF book keeping must
|
||||
// be completed by now. Also, whether the request is
|
||||
// executed or rejected, we expect the counter to be zero.
|
||||
// TODO: all test(s) using this filter must run
|
||||
// serially to each other
|
||||
defer func() {
|
||||
if atomicReadOnlyExecuting != 0 {
|
||||
t.Errorf("Wanted %d requests executing, got %d", 0, atomicReadOnlyExecuting)
|
||||
}
|
||||
}()
|
||||
apfHandler.ServeHTTP(w, r)
|
||||
postExecute()
|
||||
}()
|
||||
}), requestInfoFactory)
|
||||
|
||||
return handler
|
||||
|
|
Loading…
Reference in New Issue