Update the logic to pick the best queue in P&F

Kubernetes-commit: 0ecc7ba311ab33b16c5d907ebb1120e3e51a947d
This commit is contained in:
wojtekt 2021-06-17 09:22:00 +02:00 committed by Kubernetes Publisher
parent 462585d974
commit a35bb85f09
3 changed files with 93 additions and 10 deletions

View File

@ -48,6 +48,10 @@ type fifo interface {
// Length returns the number of requests in the list.
Length() int
// Width returns the total width (number of seats) of requests
// in this list.
Width() int
// Walk iterates through the list in order of oldest -> newest
// and executes the specified walkFunc for each request in that order.
//
@ -60,6 +64,8 @@ type fifo interface {
// goroutines without additional locking or coordination.
type requestFIFO struct {
*list.List
width int
}
func newRequestFIFO() fifo {
@ -72,10 +78,20 @@ func (l *requestFIFO) Length() int {
return l.Len()
}
func (l *requestFIFO) Width() int {
return l.width
}
func (l *requestFIFO) Enqueue(req *request) removeFromFIFOFunc {
e := l.PushBack(req)
l.width += req.Seats()
return func() *request {
l.Remove(e)
if e.Value != nil {
l.Remove(e)
e.Value = nil
l.width -= req.Seats()
}
return req
}
}
@ -85,9 +101,16 @@ func (l *requestFIFO) Dequeue() (*request, bool) {
if e == nil {
return nil, false
}
defer l.Remove(e)
defer func() {
l.Remove(e)
e.Value = nil
}()
request, ok := e.Value.(*request)
if ok {
l.width -= request.Seats()
}
return request, ok
}

View File

@ -20,6 +20,8 @@ import (
"math/rand"
"testing"
"time"
fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
)
func TestFIFOWithEnqueueDequeueSingleRequest(t *testing.T) {
@ -148,6 +150,62 @@ func TestFIFOWithRemoveIsIdempotent(t *testing.T) {
verifyOrder(t, orderExpected, remainingRequests)
}
func TestFIFOWidth(t *testing.T) {
list := newRequestFIFO()
newRequest := func(width uint) *request {
return &request{width: fcrequest.Width{Seats: width}}
}
arrival := []*request{newRequest(1), newRequest(2), newRequest(3)}
removeFn := make([]removeFromFIFOFunc, 0)
width := 0
for i := range arrival {
removeFn = append(removeFn, list.Enqueue(arrival[i]))
width += i + 1
if list.Width() != width {
t.Errorf("Expected width: %d, but got: %d", width, list.Width())
}
}
for i := range removeFn {
removeFn[i]()
width -= i + 1
if list.Width() != width {
t.Errorf("Expected width: %d, but got: %d", width, list.Width())
}
// check idempotency
removeFn[i]()
if list.Width() != width {
t.Errorf("Expected width: %d, but got: %d", width, list.Width())
}
}
// Check second type of idempotency: Dequeue + removeFn.
for i := range arrival {
removeFn[i] = list.Enqueue(arrival[i])
width += i + 1
}
for i := range arrival {
if _, ok := list.Dequeue(); !ok {
t.Errorf("Unexpected failed dequeue: %d", i)
}
width -= i + 1
if list.Width() != width {
t.Errorf("Expected width: %d, but got: %d", width, list.Width())
}
removeFn[i]()
if list.Width() != width {
t.Errorf("Expected width: %d, but got: %d", width, list.Width())
}
}
}
func TestFIFOWithWalk(t *testing.T) {
list := newRequestFIFO()

View File

@ -438,7 +438,7 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 {
// returns nil in the case that there is no available concurrency or
// the queuelengthlimit has been reached
func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, width *fqrequest.Width, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request {
// Start with the shuffle sharding, to pick a queue.
// Start with the shuffle sharding, to pick a queue.
queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2)
queue := qs.queues[queueIdx]
// The next step is the logic to reject requests that have been waiting too long
@ -472,16 +472,18 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
// using the given hashValue and the shuffle sharding parameters of the queueSet.
func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 interface{}) int {
bestQueueIdx := -1
bestQueueLen := int(math.MaxInt32)
bestQueueWidth := int(math.MaxInt32)
// the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`.
qs.dealer.Deal(hashValue, func(queueIdx int) {
thisLen := qs.queues[queueIdx].requests.Length()
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of length %d", qs.qCfg.Name, descr1, descr2, queueIdx, thisLen)
if thisLen < bestQueueLen {
bestQueueIdx, bestQueueLen = queueIdx, thisLen
// TODO: Consider taking into account `additional latency` of requests
// in addition to their widths.
thisWidth := qs.queues[queueIdx].requests.Width()
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of width %d", qs.qCfg.Name, descr1, descr2, queueIdx, thisWidth)
if thisWidth < bestQueueWidth {
bestQueueIdx, bestQueueWidth = queueIdx, thisWidth
}
})
klog.V(6).Infof("QS(%s) at r=%s v=%.9fs: For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, bestQueueLen, qs.queues[bestQueueIdx].requestsExecuting)
klog.V(6).Infof("QS(%s) at r=%s v=%.9fs: For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, bestQueueWidth, qs.queues[bestQueueIdx].requestsExecuting)
return bestQueueIdx
}
@ -531,7 +533,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s
// Otherwise enqueues and returns true.
func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool {
queue := request.queue
curQueueLength := queue.requests.Length()
curQueueLength := queue.requests.Width()
// rejects the newly arrived request if resource criteria not met
if qs.totSeatsInUse >= qs.dCfg.ConcurrencyLimit &&
curQueueLength >= qs.qCfg.QueueLengthLimit {