From da50ca4c6e7d5d26f69b712098771209d2c59b83 Mon Sep 17 00:00:00 2001 From: Abu Kashem Date: Sun, 27 Jun 2021 13:04:20 -0400 Subject: [PATCH] apf: free seats in use after additional latency Kubernetes-commit: d68186452d9150b113489e6a722caf82f898857f --- .../fairqueuing/queueset/queueset.go | 57 ++++++++++++--- .../fairqueuing/queueset/queueset_test.go | 73 +++++++++++++++++++ pkg/util/flowcontrol/request/width.go | 7 ++ 3 files changed, 126 insertions(+), 11 deletions(-) diff --git a/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 0738c1fde..db71ad61d 100644 --- a/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -775,11 +775,49 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) bool func (qs *queueSet) finishRequestLocked(r *request) { now := qs.clock.Now() qs.totRequestsExecuting-- - qs.totSeatsInUse -= r.Seats() metrics.AddRequestsExecuting(r.ctx, qs.qCfg.Name, r.fsName, -1) - metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.Seats()) qs.obsPair.RequestsExecuting.Add(-1) + S := now.Sub(r.startTime).Seconds() + + // TODO: for now we keep the logic localized so it is easier to see + // how the counters are tracked for queueset and queue, in future we + // can refactor to move this function. + releaseSeatsLocked := func() { + defer qs.removeQueueIfEmptyLocked(r) + + qs.totSeatsInUse -= r.Seats() + metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.Seats()) + if r.queue != nil { + r.queue.seatsInUse -= r.Seats() + + if klog.V(6).Enabled() { + klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", + qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index, + r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requestsExecuting) + } + } + } + + defer func() { + if r.workEstimate.AdditionalLatency <= 0 { + // release the seats allocated to this request immediately + releaseSeatsLocked() + return + } + + additionalLatency := r.workEstimate.AdditionalLatency + // EventAfterDuration will execute the event func in a new goroutine, + // so the seats allocated to this request will be released after + // AdditionalLatency elapses, this ensures that the additional + // latency has no impact on the user experience. + qs.clock.EventAfterDuration(func(_ time.Time) { + qs.lock.Lock() + defer qs.lock.Unlock() + releaseSeatsLocked() + }, additionalLatency) + }() + if r.queue == nil { if klog.V(6).Enabled() { klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, qs.totRequestsExecuting) @@ -787,20 +825,17 @@ func (qs *queueSet) finishRequestLocked(r *request) { return } - S := now.Sub(r.startTime).Seconds() + // request has finished, remove from requests executing + r.queue.requestsExecuting-- // When a request finishes being served, and the actual service time was S, // the queue’s virtual start time is decremented by (G - S)*width. r.queue.virtualStart -= (qs.estimatedServiceTime - S) * float64(r.Seats()) +} - // request has finished, remove from requests executing - r.queue.requestsExecuting-- - r.queue.seatsInUse -= r.Seats() - - if klog.V(6).Enabled() { - klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", - qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index, - r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requestsExecuting) +func (qs *queueSet) removeQueueIfEmptyLocked(r *request) { + if r.queue == nil { + return } // If there are more queues than desired and this one has no diff --git a/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index 485565f88..b72b66dc8 100644 --- a/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -993,6 +993,79 @@ func TestSelectQueueLocked(t *testing.T) { } } +func TestFinishRequestLocked(t *testing.T) { + tests := []struct { + name string + workEstimate fcrequest.WorkEstimate + }{ + { + name: "request has additional latency", + workEstimate: fcrequest.WorkEstimate{ + Seats: 10, + AdditionalLatency: time.Minute, + }, + }, + { + name: "request has no additional latency", + workEstimate: fcrequest.WorkEstimate{ + Seats: 10, + }, + }, + } + + metrics.Register() + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + metrics.Reset() + + now := time.Now() + clk, _ := testeventclock.NewFake(now, 0, nil) + qs := &queueSet{ + clock: clk, + obsPair: newObserverPair(clk), + } + queue := &queue{ + requests: newRequestFIFO(), + } + r := &request{ + qs: qs, + queue: queue, + workEstimate: test.workEstimate, + } + + qs.totRequestsExecuting = 111 + qs.totSeatsInUse = 222 + queue.requestsExecuting = 11 + queue.seatsInUse = 22 + + var ( + queuesetTotalRequestsExecutingExpected = qs.totRequestsExecuting - 1 + queuesetTotalSeatsInUseExpected = qs.totSeatsInUse - int(test.workEstimate.Seats) + queueRequestsExecutingExpected = queue.requestsExecuting - 1 + queueSeatsInUseExpected = queue.seatsInUse - int(test.workEstimate.Seats) + ) + + qs.finishRequestLocked(r) + + // as soon as AdditionalLatency elapses we expect the seats to be released + clk.SetTime(now.Add(test.workEstimate.AdditionalLatency)) + + if queuesetTotalRequestsExecutingExpected != qs.totRequestsExecuting { + t.Errorf("Expected total requests executing: %d, but got: %d", queuesetTotalRequestsExecutingExpected, qs.totRequestsExecuting) + } + if queuesetTotalSeatsInUseExpected != qs.totSeatsInUse { + t.Errorf("Expected total seats in use: %d, but got: %d", queuesetTotalSeatsInUseExpected, qs.totSeatsInUse) + } + if queueRequestsExecutingExpected != queue.requestsExecuting { + t.Errorf("Expected requests executing for queue: %d, but got: %d", queueRequestsExecutingExpected, queue.requestsExecuting) + } + if queueSeatsInUseExpected != queue.seatsInUse { + t.Errorf("Expected seats in use for queue: %d, but got: %d", queueSeatsInUseExpected, queue.seatsInUse) + } + }) + } +} + func newFIFO(requests ...*request) fifo { l := newRequestFIFO() for i := range requests { diff --git a/pkg/util/flowcontrol/request/width.go b/pkg/util/flowcontrol/request/width.go index 07f458c0b..e1181b627 100644 --- a/pkg/util/flowcontrol/request/width.go +++ b/pkg/util/flowcontrol/request/width.go @@ -19,6 +19,7 @@ package request import ( "fmt" "net/http" + "time" apirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/klog/v2" @@ -35,6 +36,12 @@ const ( type WorkEstimate struct { // Seats represents the number of seats associated with this request Seats uint + + // AdditionalLatency specifies the additional duration the seats allocated + // to this request must be reserved after the given request had finished. + // AdditionalLatency should not have any impact on the user experience, the + // caller must not experience this additional latency. + AdditionalLatency time.Duration } // objectCountGetterFunc represents a function that gets the total