do not allow inflight watermark histograms to fall too far behind
The MaxInFlight and PriorityAndFairness apiserver filters maintain watermarks with histogram metrics that are observed when requests are handled. When a request is received, the watermark observer needs to fill out observations for the entire time period since the last request was received. If it has been a long time since a request has been received, then it can take an inordinate amount of time to fill out the observations, to the extent that the request may time out. To combat this, these changes will have the filters fill out the observations on a 10-second interval, so that the observations never fall too far behind. This follows a similar approach taken in 9e89b92a92c02cdd2c70c0f52a30936e9c3309c7. https://github.com/kubernetes/kubernetes/issues/95300 The Priority-and-Fairness and Max-in-Flight filters start goroutines to handle some maintenance tasks on the watermarks for those filters. Once started, these goroutines run forever. Instead, the goroutines should have a lifetime tied to the lifetime of the apiserver. These changes move the functionality for starting the goroutines to a PostStartHook. The goroutines have been changed to accept a stop channel and only run until the stop channel is closed. Kubernetes-commit: 6c9b86646871f13a4431361310ba6a0785372053
This commit is contained in:
parent
bc90f6d3a2
commit
a541a1b602
|
|
@ -639,6 +639,31 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
|
|||
klog.V(3).Infof("Not requested to run hook %s", priorityAndFairnessConfigConsumerHookName)
|
||||
}
|
||||
|
||||
// Add PostStartHooks for maintaining the watermarks for the Priority-and-Fairness and the Max-in-Flight filters.
|
||||
if c.FlowControl != nil {
|
||||
const priorityAndFairnessFilterHookName = "priority-and-fairness-filter"
|
||||
if !s.isPostStartHookRegistered(priorityAndFairnessFilterHookName) {
|
||||
err := s.AddPostStartHook(priorityAndFairnessFilterHookName, func(context PostStartHookContext) error {
|
||||
genericfilters.StartPriorityAndFairnessWatermarkMaintenance(context.StopCh)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
} else {
|
||||
const maxInFlightFilterHookName = "max-in-flight-filter"
|
||||
if !s.isPostStartHookRegistered(maxInFlightFilterHookName) {
|
||||
err := s.AddPostStartHook(maxInFlightFilterHookName, func(context PostStartHookContext) error {
|
||||
genericfilters.StartMaxInFlightWatermarkMaintenance(context.StopCh)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, delegateCheck := range delegationTarget.HealthzChecks() {
|
||||
skip := false
|
||||
for _, existingCheck := range c.HealthzChecks {
|
||||
|
|
|
|||
|
|
@ -155,6 +155,7 @@ func TestNewWithDelegate(t *testing.T) {
|
|||
"/healthz/ping",
|
||||
"/healthz/poststarthook/delegate-post-start-hook",
|
||||
"/healthz/poststarthook/generic-apiserver-start-informers",
|
||||
"/healthz/poststarthook/max-in-flight-filter",
|
||||
"/healthz/poststarthook/wrapping-post-start-hook",
|
||||
"/healthz/wrapping-health",
|
||||
"/livez",
|
||||
|
|
@ -163,6 +164,7 @@ func TestNewWithDelegate(t *testing.T) {
|
|||
"/livez/ping",
|
||||
"/livez/poststarthook/delegate-post-start-hook",
|
||||
"/livez/poststarthook/generic-apiserver-start-informers",
|
||||
"/livez/poststarthook/max-in-flight-filter",
|
||||
"/livez/poststarthook/wrapping-post-start-hook",
|
||||
"/metrics",
|
||||
"/readyz",
|
||||
|
|
@ -172,6 +174,7 @@ func TestNewWithDelegate(t *testing.T) {
|
|||
"/readyz/ping",
|
||||
"/readyz/poststarthook/delegate-post-start-hook",
|
||||
"/readyz/poststarthook/generic-apiserver-start-informers",
|
||||
"/readyz/poststarthook/max-in-flight-filter",
|
||||
"/readyz/poststarthook/wrapping-post-start-hook",
|
||||
"/readyz/shutdown",
|
||||
}
|
||||
|
|
@ -181,6 +184,7 @@ func TestNewWithDelegate(t *testing.T) {
|
|||
[-]wrapping-health failed: reason withheld
|
||||
[-]delegate-health failed: reason withheld
|
||||
[+]poststarthook/generic-apiserver-start-informers ok
|
||||
[+]poststarthook/max-in-flight-filter ok
|
||||
[+]poststarthook/delegate-post-start-hook ok
|
||||
[+]poststarthook/wrapping-post-start-hook ok
|
||||
healthz check failed
|
||||
|
|
|
|||
|
|
@ -41,6 +41,10 @@ const (
|
|||
// the metrics tracks maximal value over period making this
|
||||
// longer will increase the metric value.
|
||||
inflightUsageMetricUpdatePeriod = time.Second
|
||||
|
||||
// How often to run maintenance on observations to ensure
|
||||
// that they do not fall too far behind.
|
||||
observationMaintenancePeriod = 10 * time.Second
|
||||
)
|
||||
|
||||
var nonMutatingRequestVerbs = sets.NewString("get", "list", "watch")
|
||||
|
|
@ -88,23 +92,29 @@ var watermark = &requestWatermark{
|
|||
mutatingObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{metrics.MutatingKind}).RequestsExecuting,
|
||||
}
|
||||
|
||||
func startRecordingUsage(watermark *requestWatermark) {
|
||||
go func() {
|
||||
wait.Forever(func() {
|
||||
watermark.lock.Lock()
|
||||
readOnlyWatermark := watermark.readOnlyWatermark
|
||||
mutatingWatermark := watermark.mutatingWatermark
|
||||
watermark.readOnlyWatermark = 0
|
||||
watermark.mutatingWatermark = 0
|
||||
watermark.lock.Unlock()
|
||||
// startWatermarkMaintenance starts the goroutines to observe and maintain the specified watermark.
|
||||
func startWatermarkMaintenance(watermark *requestWatermark, stopCh <-chan struct{}) {
|
||||
// Periodically update the inflight usage metric.
|
||||
go wait.Until(func() {
|
||||
watermark.lock.Lock()
|
||||
readOnlyWatermark := watermark.readOnlyWatermark
|
||||
mutatingWatermark := watermark.mutatingWatermark
|
||||
watermark.readOnlyWatermark = 0
|
||||
watermark.mutatingWatermark = 0
|
||||
watermark.lock.Unlock()
|
||||
|
||||
metrics.UpdateInflightRequestMetrics(watermark.phase, readOnlyWatermark, mutatingWatermark)
|
||||
}, inflightUsageMetricUpdatePeriod)
|
||||
}()
|
||||
metrics.UpdateInflightRequestMetrics(watermark.phase, readOnlyWatermark, mutatingWatermark)
|
||||
}, inflightUsageMetricUpdatePeriod, stopCh)
|
||||
|
||||
// Periodically observe the watermarks. This is done to ensure that they do not fall too far behind. When they do
|
||||
// fall too far behind, then there is a long delay in responding to the next request received while the observer
|
||||
// catches back up.
|
||||
go wait.Until(func() {
|
||||
watermark.readOnlyObserver.Add(0)
|
||||
watermark.mutatingObserver.Add(0)
|
||||
}, observationMaintenancePeriod, stopCh)
|
||||
}
|
||||
|
||||
var startOnce sync.Once
|
||||
|
||||
// WithMaxInFlightLimit limits the number of in-flight requests to buffer size of the passed in channel.
|
||||
func WithMaxInFlightLimit(
|
||||
handler http.Handler,
|
||||
|
|
@ -112,7 +122,6 @@ func WithMaxInFlightLimit(
|
|||
mutatingLimit int,
|
||||
longRunningRequestCheck apirequest.LongRunningRequestCheck,
|
||||
) http.Handler {
|
||||
startOnce.Do(func() { startRecordingUsage(watermark) })
|
||||
if nonMutatingLimit == 0 && mutatingLimit == 0 {
|
||||
return handler
|
||||
}
|
||||
|
|
@ -198,6 +207,12 @@ func WithMaxInFlightLimit(
|
|||
})
|
||||
}
|
||||
|
||||
// StartMaxInFlightWatermarkMaintenance starts the goroutines to observe and maintain watermarks for max-in-flight
|
||||
// requests.
|
||||
func StartMaxInFlightWatermarkMaintenance(stopCh <-chan struct{}) {
|
||||
startWatermarkMaintenance(watermark, stopCh)
|
||||
}
|
||||
|
||||
func tooManyRequests(req *http.Request, w http.ResponseWriter) {
|
||||
// Return a 429 status indicating "Too Many Requests"
|
||||
w.Header().Set("Retry-After", retryAfter)
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package filters
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
|
|
@ -103,6 +104,10 @@ func TestMaxInFlightNonMutating(t *testing.T) {
|
|||
server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, AllowedNonMutatingInflightRequestsNo, 1)
|
||||
defer server.Close()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
StartMaxInFlightWatermarkMaintenance(ctx.Done())
|
||||
|
||||
// These should hang, but not affect accounting. use a query param match
|
||||
for i := 0; i < AllowedNonMutatingInflightRequestsNo; i++ {
|
||||
// These should hang waiting on block...
|
||||
|
|
@ -183,6 +188,10 @@ func TestMaxInFlightMutating(t *testing.T) {
|
|||
server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, 1, AllowedMutatingInflightRequestsNo)
|
||||
defer server.Close()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
StartMaxInFlightWatermarkMaintenance(ctx.Done())
|
||||
|
||||
// These should hang and be accounted, i.e. saturate the server
|
||||
for i := 0; i < AllowedMutatingInflightRequestsNo; i++ {
|
||||
// These should hang waiting on block...
|
||||
|
|
@ -275,6 +284,10 @@ func TestMaxInFlightSkipsMasters(t *testing.T) {
|
|||
server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, 1, AllowedMutatingInflightRequestsNo)
|
||||
defer server.Close()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
StartMaxInFlightWatermarkMaintenance(ctx.Done())
|
||||
|
||||
// These should hang and be accounted, i.e. saturate the server
|
||||
for i := 0; i < AllowedMutatingInflightRequestsNo; i++ {
|
||||
// These should hang waiting on block...
|
||||
|
|
|
|||
|
|
@ -20,7 +20,6 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
fcv1a1 "k8s.io/api/flowcontrol/v1alpha1"
|
||||
|
|
@ -58,9 +57,6 @@ var waitingMark = &requestWatermark{
|
|||
mutatingObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{epmetrics.MutatingKind}).RequestsWaiting,
|
||||
}
|
||||
|
||||
// apfStartOnce is used to avoid sharing one-time mutex with maxinflight handler
|
||||
var apfStartOnce sync.Once
|
||||
|
||||
var atomicMutatingExecuting, atomicReadOnlyExecuting int32
|
||||
var atomicMutatingWaiting, atomicReadOnlyWaiting int32
|
||||
|
||||
|
|
@ -75,12 +71,6 @@ func WithPriorityAndFairness(
|
|||
klog.Warningf("priority and fairness support not found, skipping")
|
||||
return handler
|
||||
}
|
||||
startOnce.Do(func() {
|
||||
startRecordingUsage(watermark)
|
||||
})
|
||||
apfStartOnce.Do(func() {
|
||||
startRecordingUsage(waitingMark)
|
||||
})
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
|
||||
|
|
@ -156,3 +146,10 @@ func WithPriorityAndFairness(
|
|||
|
||||
})
|
||||
}
|
||||
|
||||
// StartPriorityAndFairnessWatermarkMaintenance starts the goroutines to observe and maintain watermarks for
|
||||
// priority-and-fairness requests.
|
||||
func StartPriorityAndFairnessWatermarkMaintenance(stopCh <-chan struct{}) {
|
||||
startWatermarkMaintenance(watermark, stopCh)
|
||||
startWatermarkMaintenance(waitingMark, stopCh)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -153,6 +153,10 @@ func TestApfSkipLongRunningRequest(t *testing.T) {
|
|||
server := newApfServerWithSingleRequest(decisionSkipFilter, t)
|
||||
defer server.Close()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
StartPriorityAndFairnessWatermarkMaintenance(ctx.Done())
|
||||
|
||||
// send a watch request to test skipping long running request
|
||||
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces?watch=true", server.URL), http.StatusOK); err != nil {
|
||||
// request should not be rejected
|
||||
|
|
@ -166,6 +170,10 @@ func TestApfRejectRequest(t *testing.T) {
|
|||
server := newApfServerWithSingleRequest(decisionReject, t)
|
||||
defer server.Close()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
StartPriorityAndFairnessWatermarkMaintenance(ctx.Done())
|
||||
|
||||
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusTooManyRequests); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
|
@ -187,6 +195,10 @@ func TestApfExemptRequest(t *testing.T) {
|
|||
server := newApfServerWithSingleRequest(decisionNoQueuingExecute, t)
|
||||
defer server.Close()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
StartPriorityAndFairnessWatermarkMaintenance(ctx.Done())
|
||||
|
||||
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusOK); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
|
@ -209,6 +221,10 @@ func TestApfExecuteRequest(t *testing.T) {
|
|||
server := newApfServerWithSingleRequest(decisionQueuingExecute, t)
|
||||
defer server.Close()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
StartPriorityAndFairnessWatermarkMaintenance(ctx.Done())
|
||||
|
||||
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusOK); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
|
@ -274,6 +290,10 @@ func TestApfExecuteMultipleRequests(t *testing.T) {
|
|||
server := newApfServerWithHooks(decisionQueuingExecute, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc, t)
|
||||
defer server.Close()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
StartPriorityAndFairnessWatermarkMaintenance(ctx.Done())
|
||||
|
||||
for i := 0; i < concurrentRequests; i++ {
|
||||
var err error
|
||||
go func() {
|
||||
|
|
|
|||
Loading…
Reference in New Issue