APF: Dynamically compute retry-after based on history

Kubernetes-commit: 23ac0fdaa52209c06eacf3613101174ea77ec42b
This commit is contained in:
Wojciech Tyczyński 2023-04-20 10:18:48 +02:00 committed by Kubernetes Publisher
parent 429762b215
commit 6c23e503a3
4 changed files with 415 additions and 2 deletions

View File

@ -21,6 +21,7 @@ import (
"fmt"
"net/http"
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"
@ -72,6 +73,11 @@ type priorityAndFairnessHandler struct {
longRunningRequestCheck apirequest.LongRunningRequestCheck
fcIfc utilflowcontrol.Interface
workEstimator flowcontrolrequest.WorkEstimatorFunc
// droppedRequests tracks the history of dropped requests for
// the purpose of computing RetryAfter header to avoid system
// overload.
droppedRequests utilflowcontrol.DroppedRequestsTracker
}
func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Request) {
@ -288,7 +294,11 @@ func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Reque
epmetrics.RecordDroppedRequest(r, requestInfo, epmetrics.APIServerComponent, isMutatingRequest)
epmetrics.RecordRequestTermination(r, requestInfo, epmetrics.APIServerComponent, http.StatusTooManyRequests)
tooManyRequests(r, w, retryAfter)
h.droppedRequests.RecordDroppedRequest(classification.PriorityLevelName)
// TODO(wojtek-t): Idea from deads2k: we can consider some jittering and in case of non-int
// number, just return the truncated result and sleep the remainder server-side.
tooManyRequests(r, w, strconv.Itoa(int(h.droppedRequests.GetRetryAfter(classification.PriorityLevelName))))
}
}
@ -317,6 +327,7 @@ func WithPriorityAndFairness(
longRunningRequestCheck: longRunningRequestCheck,
fcIfc: fcIfc,
workEstimator: workEstimator,
droppedRequests: utilflowcontrol.NewDroppedRequestsTracker(),
}
return http.HandlerFunc(priorityAndFairnessHandler.Handle)
}

View File

@ -360,11 +360,12 @@ func newFakeWatchApfFilter(capacity int) *fakeWatchApfFilter {
func (f *fakeWatchApfFilter) Handle(ctx context.Context,
requestDigest utilflowcontrol.RequestDigest,
_ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
_ func() fcrequest.WorkEstimate,
_ fq.QueueNoteFn,
execFn func(),
) {
noteFn(bootstrap.SuggestedFlowSchemaGlobalDefault, bootstrap.SuggestedPriorityLevelConfigurationGlobalDefault, requestDigest.User.GetName())
canExecute := false
func() {
f.lock.Lock()

View File

@ -0,0 +1,231 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flowcontrol
import (
"sync"
"sync/atomic"
"time"
"k8s.io/utils/clock"
)
const (
// maxRetryAfter represents the maximum possible retryAfter.
maxRetryAfter = int64(32)
)
// DroppedRequestsTracker is an interface that allows tracking
// a history od dropped requests in the system for the purpose
// of adjusting RetryAfter header to avoid system overload.
type DroppedRequestsTracker interface {
// RecordDroppedRequest records a request that was just
// dropped from processing.
RecordDroppedRequest(plName string)
// GetRetryAfter returns the current suggested value of
// RetryAfter value.
GetRetryAfter(plName string) int64
}
// unixStat keeps a statistic how many requests were dropped within
// a single second.
type unixStat struct {
unixTime int64
requests int64
}
type droppedRequestsStats struct {
lock sync.RWMutex
// history stores the history of dropped requests.
history []unixStat
// To reduce lock-contention, we store the information about
// the current second here, which we can then access under
// reader lock.
currentUnix int64
currentCount atomic.Int64
retryAfter atomic.Int64
retryAfterUpdateUnix int64
}
func newDroppedRequestsStats(nowUnix int64) *droppedRequestsStats {
result := &droppedRequestsStats{
// We assume that we can bump at any time after first dropped request.
retryAfterUpdateUnix: 0,
}
result.retryAfter.Store(1)
return result
}
func (s *droppedRequestsStats) recordDroppedRequest(unixTime int64) {
// Short path - if the current second matches passed time,
// just update the stats.
if done := func() bool {
s.lock.RLock()
defer s.lock.RUnlock()
if s.currentUnix == unixTime {
s.currentCount.Add(1)
return true
}
return false
}(); done {
return
}
// We trigger the change of <currentUnix>.
s.lock.Lock()
defer s.lock.Unlock()
if s.currentUnix == unixTime {
s.currentCount.Add(1)
return
}
s.updateHistory(s.currentUnix, s.currentCount.Load())
s.currentUnix = unixTime
s.currentCount.Store(1)
// We only consider updating retryAfter when bumping the current second.
// However, given that we didn't report anything for the current second,
// we recompute it based on statistics from the previous one.
s.updateRetryAfterIfNeededLocked(unixTime)
}
func (s *droppedRequestsStats) updateHistory(unixTime int64, count int64) {
s.history = append(s.history, unixStat{unixTime: unixTime, requests: count})
startIndex := 0
for ; startIndex < len(s.history) && unixTime-s.history[startIndex].unixTime > maxRetryAfter; startIndex++ {
}
if startIndex > 0 {
s.history = s.history[startIndex:]
}
}
// updateRetryAfterIfNeededLocked updates the retryAfter based on the number of
// dropped requests in the last `retryAfter` seconds:
// - if there were less than `retryAfter` dropped requests, it decreases
// retryAfter
// - if there were at least 3*`retryAfter` dropped requests, it increases
// retryAfter
//
// The rationale behind these numbers being fairly low is that APF is queuing
// requests and rejecting (dropping) them is a last resort, which is not expected
// unless a given priority level is actually overloaded.
//
// Additionally, we rate-limit the increases of retryAfter to wait at least
// `retryAfter' seconds after the previous increase to avoid multiple bumps
// on a single spike.
//
// We're working with the interval [unixTime-retryAfter, unixTime).
func (s *droppedRequestsStats) updateRetryAfterIfNeededLocked(unixTime int64) {
retryAfter := s.retryAfter.Load()
droppedRequests := int64(0)
if len(s.history) > 0 {
for i := len(s.history) - 1; i >= 0; i-- {
if unixTime-s.history[i].unixTime > retryAfter {
break
}
if s.history[i].unixTime < unixTime {
droppedRequests += s.history[i].requests
}
}
}
if unixTime-s.retryAfterUpdateUnix >= retryAfter && droppedRequests >= 3*retryAfter {
// We try to mimic the TCP algorithm and thus are doubling
// the retryAfter here.
retryAfter *= 2
if retryAfter >= maxRetryAfter {
retryAfter = maxRetryAfter
}
s.retryAfter.Store(retryAfter)
s.retryAfterUpdateUnix = unixTime
return
}
if droppedRequests < retryAfter && retryAfter > 1 {
// We try to mimc the TCP algorithm and thus are linearly
// scaling down the retryAfter here.
retryAfter--
s.retryAfter.Store(retryAfter)
return
}
}
// droppedRequestsTracker implement DroppedRequestsTracker interface
// for the purpose of adjusting RetryAfter header for newly dropped
// requests to avoid system overload.
type droppedRequestsTracker struct {
now func() time.Time
lock sync.RWMutex
plStats map[string]*droppedRequestsStats
}
// NewDroppedRequestsTracker is creating a new instance of
// DroppedRequestsTracker.
func NewDroppedRequestsTracker() DroppedRequestsTracker {
return newDroppedRequestsTracker(clock.RealClock{}.Now)
}
func newDroppedRequestsTracker(now func() time.Time) *droppedRequestsTracker {
return &droppedRequestsTracker{
now: now,
plStats: make(map[string]*droppedRequestsStats),
}
}
func (t *droppedRequestsTracker) RecordDroppedRequest(plName string) {
unixTime := t.now().Unix()
stats := func() *droppedRequestsStats {
// The list of priority levels should change very infrequently,
// so in almost all cases, the fast path should be enough.
t.lock.RLock()
if plStats, ok := t.plStats[plName]; ok {
t.lock.RUnlock()
return plStats
}
t.lock.RUnlock()
// Slow path taking writer lock to update the map.
t.lock.Lock()
defer t.lock.Unlock()
if plStats, ok := t.plStats[plName]; ok {
return plStats
}
stats := newDroppedRequestsStats(unixTime)
t.plStats[plName] = stats
return stats
}()
stats.recordDroppedRequest(unixTime)
}
func (t *droppedRequestsTracker) GetRetryAfter(plName string) int64 {
t.lock.RLock()
defer t.lock.RUnlock()
if plStats, ok := t.plStats[plName]; ok {
return plStats.retryAfter.Load()
}
return 1
}

View File

@ -0,0 +1,170 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flowcontrol
import (
"fmt"
"sync"
"testing"
"time"
testingclock "k8s.io/utils/clock/testing"
)
func TestDroppedRequestsTracker(t *testing.T) {
fakeClock := testingclock.NewFakeClock(time.Now())
tracker := newDroppedRequestsTracker(fakeClock.Now)
// The following table represents the list over time of:
// - seconds elapsed (as computed since the initial time)
// - requests that will be recorded as dropped in a current second
steps := []struct {
secondsElapsed int
// droppedRequests is the number of requests to drop, after
// secondsElapsed.
droppedRequests int
// retryAfter is the expected retryAfter after all dropped
// requests are recorded via RecordDroppedRequest.
retryAfter int64
}{
{secondsElapsed: 0, droppedRequests: 5, retryAfter: 1},
{secondsElapsed: 1, droppedRequests: 11, retryAfter: 2},
// Check that we don't bump immediately after despite
// multiple dropped requests.
{secondsElapsed: 2, droppedRequests: 1, retryAfter: 2},
{secondsElapsed: 3, droppedRequests: 11, retryAfter: 4},
{secondsElapsed: 4, droppedRequests: 1, retryAfter: 4},
{secondsElapsed: 7, droppedRequests: 1, retryAfter: 8},
{secondsElapsed: 11, droppedRequests: 1, retryAfter: 8},
{secondsElapsed: 15, droppedRequests: 1, retryAfter: 7},
{secondsElapsed: 17, droppedRequests: 1, retryAfter: 6},
{secondsElapsed: 21, droppedRequests: 14, retryAfter: 5},
{secondsElapsed: 22, droppedRequests: 1, retryAfter: 10},
}
for i, step := range steps {
secondsToAdvance := step.secondsElapsed
if i > 0 {
secondsToAdvance -= steps[i-1].secondsElapsed
}
fakeClock.Step(time.Duration(secondsToAdvance) * time.Second)
// Record only first dropped request and recompute retryAfter.
for r := 0; r < step.droppedRequests; r++ {
tracker.RecordDroppedRequest("pl")
}
if retryAfter := tracker.GetRetryAfter("pl"); retryAfter != step.retryAfter {
t.Errorf("Unexpected retryAfter: %v, expected: %v", retryAfter, step.retryAfter)
}
}
}
func TestDroppedRequestsTrackerPLIndependent(t *testing.T) {
fakeClock := testingclock.NewFakeClock(time.Now())
tracker := newDroppedRequestsTracker(fakeClock.Now)
// Report single dropped requests in multiple PLs.
// Validate if RetryAfter isn't bumped next second.
for i := 0; i < 10; i++ {
tracker.RecordDroppedRequest(fmt.Sprintf("pl-%d", i))
}
fakeClock.Step(time.Second)
for i := 0; i < 10; i++ {
tracker.RecordDroppedRequest(fmt.Sprintf("pl-%d", i))
retryAfter := tracker.GetRetryAfter(fmt.Sprintf("pl-%d", i))
if retryAfter != 1 {
t.Errorf("Unexpected retryAfter for pl-%d: %v", i, retryAfter)
}
}
// Record few droped requests on a single PL.
// Validate that RetryAfter is bumped only for this PL.
for i := 0; i < 5; i++ {
tracker.RecordDroppedRequest("pl-0")
}
fakeClock.Step(time.Second)
for i := 0; i < 10; i++ {
tracker.RecordDroppedRequest(fmt.Sprintf("pl-%d", i))
retryAfter := tracker.GetRetryAfter(fmt.Sprintf("pl-%d", i))
switch i {
case 0:
if retryAfter != 2 {
t.Errorf("Unexpected retryAfter for pl-0: %v", retryAfter)
}
default:
if retryAfter != 1 {
t.Errorf("Unexpected retryAfter for pl-%d: %v", i, retryAfter)
}
}
}
// Validate also PL for which no dropped requests was recorded.
if retryAfter := tracker.GetRetryAfter("other-pl"); retryAfter != 1 {
t.Errorf("Unexpected retryAfter for other-pl: %v", retryAfter)
}
}
func BenchmarkDroppedRequestsTracker(b *testing.B) {
b.StopTimer()
fakeClock := testingclock.NewFakeClock(time.Now())
tracker := newDroppedRequestsTracker(fakeClock.Now)
startCh := make(chan struct{})
wg := sync.WaitGroup{}
numPLs := 5
// For all `numPLs` priority levels, create b.N workers each
// of which will try to record a dropped request every 100ms
// with a random jitter.
for i := 0; i < numPLs; i++ {
plName := fmt.Sprintf("priority-level-%d", i)
for i := 0; i < b.N; i++ {
wg.Add(1)
go func() {
defer wg.Done()
<-startCh
for a := 0; a < 5; a++ {
tracker.RecordDroppedRequest(plName)
time.Sleep(25 * time.Millisecond)
}
}()
}
}
// Time-advancing goroutine.
stopCh := make(chan struct{})
timeWg := sync.WaitGroup{}
timeWg.Add(1)
go func() {
defer timeWg.Done()
for {
select {
case <-stopCh:
return
case <-time.After(25 * time.Millisecond):
fakeClock.Step(time.Second)
}
}
}()
b.StartTimer()
close(startCh)
wg.Wait()
close(stopCh)
timeWg.Wait()
}