apf: free seats in use after additional latency
Kubernetes-commit: d68186452d9150b113489e6a722caf82f898857f
This commit is contained in:
parent
464eee4062
commit
da50ca4c6e
|
@ -775,11 +775,49 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) bool
|
|||
func (qs *queueSet) finishRequestLocked(r *request) {
|
||||
now := qs.clock.Now()
|
||||
qs.totRequestsExecuting--
|
||||
qs.totSeatsInUse -= r.Seats()
|
||||
metrics.AddRequestsExecuting(r.ctx, qs.qCfg.Name, r.fsName, -1)
|
||||
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.Seats())
|
||||
qs.obsPair.RequestsExecuting.Add(-1)
|
||||
|
||||
S := now.Sub(r.startTime).Seconds()
|
||||
|
||||
// TODO: for now we keep the logic localized so it is easier to see
|
||||
// how the counters are tracked for queueset and queue, in future we
|
||||
// can refactor to move this function.
|
||||
releaseSeatsLocked := func() {
|
||||
defer qs.removeQueueIfEmptyLocked(r)
|
||||
|
||||
qs.totSeatsInUse -= r.Seats()
|
||||
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.Seats())
|
||||
if r.queue != nil {
|
||||
r.queue.seatsInUse -= r.Seats()
|
||||
|
||||
if klog.V(6).Enabled() {
|
||||
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing",
|
||||
qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index,
|
||||
r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requestsExecuting)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if r.workEstimate.AdditionalLatency <= 0 {
|
||||
// release the seats allocated to this request immediately
|
||||
releaseSeatsLocked()
|
||||
return
|
||||
}
|
||||
|
||||
additionalLatency := r.workEstimate.AdditionalLatency
|
||||
// EventAfterDuration will execute the event func in a new goroutine,
|
||||
// so the seats allocated to this request will be released after
|
||||
// AdditionalLatency elapses, this ensures that the additional
|
||||
// latency has no impact on the user experience.
|
||||
qs.clock.EventAfterDuration(func(_ time.Time) {
|
||||
qs.lock.Lock()
|
||||
defer qs.lock.Unlock()
|
||||
releaseSeatsLocked()
|
||||
}, additionalLatency)
|
||||
}()
|
||||
|
||||
if r.queue == nil {
|
||||
if klog.V(6).Enabled() {
|
||||
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, qs.totRequestsExecuting)
|
||||
|
@ -787,20 +825,17 @@ func (qs *queueSet) finishRequestLocked(r *request) {
|
|||
return
|
||||
}
|
||||
|
||||
S := now.Sub(r.startTime).Seconds()
|
||||
// request has finished, remove from requests executing
|
||||
r.queue.requestsExecuting--
|
||||
|
||||
// When a request finishes being served, and the actual service time was S,
|
||||
// the queue’s virtual start time is decremented by (G - S)*width.
|
||||
r.queue.virtualStart -= (qs.estimatedServiceTime - S) * float64(r.Seats())
|
||||
}
|
||||
|
||||
// request has finished, remove from requests executing
|
||||
r.queue.requestsExecuting--
|
||||
r.queue.seatsInUse -= r.Seats()
|
||||
|
||||
if klog.V(6).Enabled() {
|
||||
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing",
|
||||
qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index,
|
||||
r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requestsExecuting)
|
||||
func (qs *queueSet) removeQueueIfEmptyLocked(r *request) {
|
||||
if r.queue == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// If there are more queues than desired and this one has no
|
||||
|
|
|
@ -993,6 +993,79 @@ func TestSelectQueueLocked(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestFinishRequestLocked(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
workEstimate fcrequest.WorkEstimate
|
||||
}{
|
||||
{
|
||||
name: "request has additional latency",
|
||||
workEstimate: fcrequest.WorkEstimate{
|
||||
Seats: 10,
|
||||
AdditionalLatency: time.Minute,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "request has no additional latency",
|
||||
workEstimate: fcrequest.WorkEstimate{
|
||||
Seats: 10,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
metrics.Register()
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
metrics.Reset()
|
||||
|
||||
now := time.Now()
|
||||
clk, _ := testeventclock.NewFake(now, 0, nil)
|
||||
qs := &queueSet{
|
||||
clock: clk,
|
||||
obsPair: newObserverPair(clk),
|
||||
}
|
||||
queue := &queue{
|
||||
requests: newRequestFIFO(),
|
||||
}
|
||||
r := &request{
|
||||
qs: qs,
|
||||
queue: queue,
|
||||
workEstimate: test.workEstimate,
|
||||
}
|
||||
|
||||
qs.totRequestsExecuting = 111
|
||||
qs.totSeatsInUse = 222
|
||||
queue.requestsExecuting = 11
|
||||
queue.seatsInUse = 22
|
||||
|
||||
var (
|
||||
queuesetTotalRequestsExecutingExpected = qs.totRequestsExecuting - 1
|
||||
queuesetTotalSeatsInUseExpected = qs.totSeatsInUse - int(test.workEstimate.Seats)
|
||||
queueRequestsExecutingExpected = queue.requestsExecuting - 1
|
||||
queueSeatsInUseExpected = queue.seatsInUse - int(test.workEstimate.Seats)
|
||||
)
|
||||
|
||||
qs.finishRequestLocked(r)
|
||||
|
||||
// as soon as AdditionalLatency elapses we expect the seats to be released
|
||||
clk.SetTime(now.Add(test.workEstimate.AdditionalLatency))
|
||||
|
||||
if queuesetTotalRequestsExecutingExpected != qs.totRequestsExecuting {
|
||||
t.Errorf("Expected total requests executing: %d, but got: %d", queuesetTotalRequestsExecutingExpected, qs.totRequestsExecuting)
|
||||
}
|
||||
if queuesetTotalSeatsInUseExpected != qs.totSeatsInUse {
|
||||
t.Errorf("Expected total seats in use: %d, but got: %d", queuesetTotalSeatsInUseExpected, qs.totSeatsInUse)
|
||||
}
|
||||
if queueRequestsExecutingExpected != queue.requestsExecuting {
|
||||
t.Errorf("Expected requests executing for queue: %d, but got: %d", queueRequestsExecutingExpected, queue.requestsExecuting)
|
||||
}
|
||||
if queueSeatsInUseExpected != queue.seatsInUse {
|
||||
t.Errorf("Expected seats in use for queue: %d, but got: %d", queueSeatsInUseExpected, queue.seatsInUse)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func newFIFO(requests ...*request) fifo {
|
||||
l := newRequestFIFO()
|
||||
for i := range requests {
|
||||
|
|
|
@ -19,6 +19,7 @@ package request
|
|||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/klog/v2"
|
||||
|
@ -35,6 +36,12 @@ const (
|
|||
type WorkEstimate struct {
|
||||
// Seats represents the number of seats associated with this request
|
||||
Seats uint
|
||||
|
||||
// AdditionalLatency specifies the additional duration the seats allocated
|
||||
// to this request must be reserved after the given request had finished.
|
||||
// AdditionalLatency should not have any impact on the user experience, the
|
||||
// caller must not experience this additional latency.
|
||||
AdditionalLatency time.Duration
|
||||
}
|
||||
|
||||
// objectCountGetterFunc represents a function that gets the total
|
||||
|
|
Loading…
Reference in New Issue