Brushed up fairqueuing package

This commit responds to the comments on PR #85192 that were not yet
addressed at the time it merged, apart from the one fixed in PR

Generalized fairqueuing to allow for zero queues, to support a
priority level that limits concurrency but does no queuing.

Kubernetes-commit: b123a43e7117e977606bacd31d77f4a30d2ed212
This commit is contained in:
Mike Spreitzer 2019-11-13 23:34:10 -05:00 committed by Kubernetes Publisher
parent ccb472b74e
commit d0f4b93c91
8 changed files with 270 additions and 190 deletions

View File

@ -70,7 +70,9 @@ type QueueSetConfig struct {
Name string
// ConcurrencyLimit is the maximum number of requests of this QueueSet that may be executing at a time
ConcurrencyLimit int
// DesiredNumQueues is the number of queues that the API says should exist now
// DesiredNumQueues is the number of queues that the API says
// should exist now. This may be zero, in which case
// QueueLengthLimit, HandSize, and RequestWaitLimit are ignored.
DesiredNumQueues int
// QueueLengthLimit is the maximum number of requests that may be waiting in a given queue at a time
QueueLengthLimit int

View File

@ -20,7 +20,7 @@ import (
"sync"
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
"k8s.io/apiserver/pkg/util/promise"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise"
)
// lockingPromise implements LockingMutable based on a condition

View File

@ -14,33 +14,31 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package queueset
// This package implements a technique called "fair queuing for server
// requests". One QueueSet is a set of queues operating according to
// this technique.
// Package queueset implements a technique called "fair queuing for
// server requests". One QueueSet is a set of queues operating
// according to this technique.
//
// Fair queuing for server requests is inspired by the fair queuing
// technique from the world of networking. You can find a good paper
// on that at https://dl.acm.org/citation.cfm?doid=75247.75248 or
// http://people.csail.mit.edu/imcgraw/links/research/pubs/networks/WFQ.pdf
// and there is an implementation outline in the Wikipedia article at
// https://en.wikipedia.org/wiki/Fair_queuing .
//
// Fair queuing for server requests differs from traditional fair
// queuing in three ways: (1) we are dispatching requests to be
// executed within a process rather than transmitting packets on a
// network link, (2) multiple requests can be executing at once, and
// (3) the service time (execution duration) is not known until the
// execution completes.
// queuing in three ways: (1) we are dispatching application layer
// requests to a server rather than transmitting packets on a network
// link, (2) multiple requests can be executing at once, and (3) the
// service time (execution duration) is not known until the execution
// completes.
//
// The first two differences can easily be handled by straightforward
// adaptation of the concept called "R(t)" in the original paper and
// "virtual time" in the implementation outline. In that
// implementation outline, the notation now() is used to mean reading
// the virtual clock. In the original papers terms, "R(t)" is the
// number of "rounds" that have been completed at real time t, where a
// round consists of virtually transmitting one bit from every
// number of "rounds" that have been completed at real time t ---
// where a round consists of virtually transmitting one bit from every
// non-empty queue in the router (regardless of which queue holds the
// packet that is really being transmitted at the moment); in this
// conception, a packet is considered to be "in" its queue until the
@ -55,12 +53,12 @@ package queueset
// respect to t is
//
// 1 / NEQ(t) .
//
// To generalize from transmitting one packet at a time to executing C
// requests at a time, that derivative becomes
//
// C / NEQ(t) .
//
// However, sometimes there are fewer than C requests available to
// execute. For a given queue "q", let us also write "reqs(q, t)" for
// the number of requests of that queue that are executing at that
@ -79,25 +77,25 @@ package queueset
// real nanosecond). Where the networking implementation outline adds
// packet size to a virtual time, in our version this corresponds to
// adding a service time (i.e., duration) to virtual time.
//
// The third difference is handled by modifying the algorithm to
// dispatch based on an initial guess at the requests service time
// (duration) and then make the corresponding adjustments once the
// requests actual service time is known. This is similar, although
// not exactly isomorphic, to the original papers adjustment by
// `$delta` for the sake of promptness.
// `$\delta$` for the sake of promptness.
//
// For implementation simplicity (see below), let us use the same
// initial service time guess for every request; call that duration
// G. A good choice might be the service time limit (1
// minute). Different guesses will give slightly different dynamics,
// but any positive number can be used for G without ruining the
// long-term behavior.
//
// As in ordinary fair queuing, there is a bound on divergence from
// the ideal. In plain fair queuing the bound is one packet; in our
// version it is C requests.
//
// To support efficiently making the necessary adjustments once a
// requests actual service time is known, the virtual finish time of
// a request and the last virtual finish time of a queue are not
@ -118,3 +116,5 @@ package queueset
// queues virtual start time is advanced by G. When a request
// finishes being served, and the actual service time was S, the
// queues virtual start time is decremented by G - S.
//
package queueset

View File

@ -28,8 +28,8 @@ import (
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
"k8s.io/apiserver/pkg/util/promise/lockingpromise"
"k8s.io/apiserver/pkg/util/shufflesharding"
"k8s.io/klog"
)
@ -56,48 +56,62 @@ func NewQueueSetFactory(c clock.PassiveClock, counter counter.GoRoutineCounter)
// the QueueSet interface. The clock, GoRoutineCounter, and estimated
// service time should not be changed; the fields listed after the
// lock must be accessed only while holding the lock.
// This is not yet designed to support limiting concurrency without
// queuing (this will need to be added soon).
type queueSet struct {
clock clock.PassiveClock
counter counter.GoRoutineCounter
estimatedServiceTime float64
lock sync.Mutex
// config holds the current configuration. Its DesiredNumQueues
// may be less than the current number of queues. If its
// DesiredNumQueues is zero then its other queuing parameters
// retain the settings they had when DesiredNumQueues was last
// non-zero (if ever).
config fq.QueueSetConfig
// queues may be longer than the desired number, while the excess
// queues are still draining.
queues []*queue
// virtualTime is the number of virtual seconds since process startup
virtualTime float64
// lastRealTime is what `clock.Now()` yielded when `virtualTime` was last updated
lastRealTime time.Time
// robinIndex is the index of the last queue dispatched
robinIndex int
// numRequestsEnqueued is the number of requests currently waiting
// in a queue (eg: incremeneted on Enqueue, decremented on Dequue)
numRequestsEnqueued int
// totRequestsWaiting is the sum, over all the queues, of the
// number of requests waiting in that queue
totRequestsWaiting int
// totRequestsExecuting is the total number of requests of this
// queueSet that are currently executing. That is the same as the
// sum, over all the queues, of the number of requests executing
// from that queue.
totRequestsExecuting int
emptyHandler fq.EmptyHandler
dealer *shufflesharding.Dealer
}
// NewQueueSet creates a new QueueSet object
// NewQueueSet creates a new QueueSet object.
// There is a new QueueSet created for each priority level.
func (qsf queueSetFactory) NewQueueSet(config fq.QueueSetConfig) (fq.QueueSet, error) {
dealer, err := shufflesharding.NewDealer(config.DesiredNumQueues, config.HandSize)
if err != nil {
return nil, errors.Wrap(err, "shuffle sharding dealer creation failed")
}
fq := &queueSet{
config: config,
counter: qsf.counter,
queues: createQueues(config.DesiredNumQueues, 0),
clock: qsf.clock,
virtualTime: 0,
counter: qsf.counter,
estimatedServiceTime: 60,
config: config,
lastRealTime: qsf.clock.Now(),
dealer: dealer,
}
err := fq.SetConfiguration(config)
if err != nil {
return nil, err
}
return fq, nil
}
@ -106,7 +120,7 @@ func (qsf queueSetFactory) NewQueueSet(config fq.QueueSetConfig) (fq.QueueSet, e
func createQueues(n, baseIndex int) []*queue {
fqqueues := make([]*queue, n)
for i := 0; i < n; i++ {
fqqueues[i] = &queue{Index: baseIndex + i, Requests: make([]*request, 0)}
fqqueues[i] = &queue{index: baseIndex + i, requests: make([]*request, 0)}
}
return fqqueues
}
@ -118,12 +132,14 @@ func createQueues(n, baseIndex int) []*queue {
func (qs *queueSet) SetConfiguration(config fq.QueueSetConfig) error {
qs.lockAndSyncTime()
defer qs.lock.Unlock()
var dealer *shufflesharding.Dealer
dealer, err := shufflesharding.NewDealer(config.DesiredNumQueues, config.HandSize)
if config.DesiredNumQueues > 0 {
var err error
dealer, err = shufflesharding.NewDealer(config.DesiredNumQueues, config.HandSize)
if err != nil {
return errors.Wrap(err, "shuffle sharding dealer creation failed")
}
// Adding queues is the only thing that requires immediate action
// Removing queues is handled by omitting indexes >DesiredNum from
// chooseQueueIndexLocked
@ -132,6 +148,11 @@ func (qs *queueSet) SetConfiguration(config fq.QueueSetConfig) error {
qs.queues = append(qs.queues,
createQueues(config.DesiredNumQueues-numQueues, len(qs.queues))...)
}
} else {
config.QueueLengthLimit = qs.config.QueueLengthLimit
config.HandSize = qs.config.HandSize
config.RequestWaitLimit = qs.config.RequestWaitLimit
}
qs.config = config
qs.dealer = dealer
@ -162,12 +183,15 @@ func (qs *queueSet) Quiesce(eh fq.EmptyHandler) {
qs.maybeForkEmptyHandlerLocked()
}
// Values passed through a request's Decision
// A decision about a request
type requestDecision int
// Values passed through a request's decision
const (
DecisionExecute = "execute"
DecisionReject = "reject"
DecisionCancel = "cancel"
DecisionTryAnother = "tryAnother"
decisionExecute requestDecision = iota
decisionReject
decisionCancel
decisionTryAnother
)
// Wait uses the given hashValue as the source of entropy as it
@ -186,14 +210,26 @@ const (
// irrelevant.
func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (tryAnother, execute bool, afterExecution func()) {
var req *request
decision := func() string {
decision := func() requestDecision {
qs.lockAndSyncTime()
defer qs.lock.Unlock()
// A call to Wait while the system is quiescing will be rebuffed by
// returning `tryAnother=true`.
if qs.emptyHandler != nil {
klog.V(5).Infof("QS(%s): rebuffing request %#+v %#+v with TryAnother", qs.config.Name, descr1, descr2)
return DecisionTryAnother
return decisionTryAnother
}
// ========================================================================
// Step 0:
// Apply only concurrency limit, if zero queues desired
if qs.config.DesiredNumQueues < 1 {
if qs.totRequestsExecuting >= qs.config.ConcurrencyLimit {
klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v because %d are executing and the limit is %d", qs.config.Name, descr1, descr2, qs.totRequestsExecuting, qs.config.ConcurrencyLimit)
return decisionReject
}
req = qs.dispatchSansQueue(descr1, descr2)
return decisionExecute
}
// ========================================================================
@ -209,7 +245,7 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i
if req == nil {
klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v due to queue full", qs.config.Name, descr1, descr2)
metrics.AddReject(qs.config.Name, "queue-full")
return DecisionReject
return decisionReject
}
// ========================================================================
@ -239,7 +275,7 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i
select {
case <-doneCh:
klog.V(6).Infof("QS(%s): Context of request %#+v %#+v is Done", qs.config.Name, descr1, descr2)
req.Decision.Set(DecisionCancel)
req.decision.Set(decisionCancel)
}
qs.goroutineDoneOrBlocked()
}()
@ -249,30 +285,30 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i
// Step 4:
// The final step in Wait is to wait on a decision from
// somewhere and then act on it.
decisionAny := req.Decision.GetLocked()
var decisionStr string
switch d := decisionAny.(type) {
case string:
decisionStr = d
decisionAny := req.decision.GetLocked()
var decision requestDecision
switch dec := decisionAny.(type) {
case requestDecision:
decision = dec
default:
klog.Errorf("QS(%s): Impossible decision %#+v (of type %T) for request %#+v %#+v", qs.config.Name, decisionAny, decisionAny, descr1, descr2)
decisionStr = DecisionExecute
decision = decisionExecute
}
switch decisionStr {
case DecisionReject:
switch decision {
case decisionReject:
klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.config.Name, descr1, descr2)
metrics.AddReject(qs.config.Name, "time-out")
case DecisionCancel:
case decisionCancel:
qs.syncTimeLocked()
// TODO(aaron-prindle) add metrics to these two cases
if req.IsWaiting {
if req.isWaiting {
klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.config.Name, descr1, descr2)
// remove the request from the queue as it has timed out
for i := range req.Queue.Requests {
if req == req.Queue.Requests[i] {
for i := range req.queue.requests {
if req == req.queue.requests[i] {
// remove the request
req.Queue.Requests = append(req.Queue.Requests[:i],
req.Queue.Requests[i+1:]...)
req.queue.requests = append(req.queue.requests[:i],
req.queue.requests[i+1:]...)
break
}
}
@ -284,17 +320,15 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i
klog.V(5).Infof("QS(%s): request %#+v %#+v canceled shortly after dispatch", qs.config.Name, descr1, descr2)
}
}
return decisionStr
return decision
}()
switch decision {
case DecisionTryAnother:
case decisionTryAnother:
return true, false, func() {}
case DecisionReject:
return false, false, func() {}
case DecisionCancel:
case decisionReject, decisionCancel:
return false, false, func() {}
default:
if decision != DecisionExecute {
if decision != decisionExecute {
klog.Errorf("Impossible decision %q", decision)
}
return false, true, func() {
@ -317,9 +351,9 @@ func (qs *queueSet) lockAndSyncTime() {
// lock and before modifying the state of any queue.
func (qs *queueSet) syncTimeLocked() {
realNow := qs.clock.Now()
timesincelast := realNow.Sub(qs.lastRealTime).Seconds()
timeSinceLast := realNow.Sub(qs.lastRealTime).Seconds()
qs.lastRealTime = realNow
qs.virtualTime += timesincelast * qs.getVirtualTimeRatio()
qs.virtualTime += timeSinceLast * qs.getVirtualTimeRatio()
}
// getVirtualTimeRatio calculates the rate at which virtual time has
@ -328,8 +362,8 @@ func (qs *queueSet) getVirtualTimeRatio() float64 {
activeQueues := 0
reqs := 0
for _, queue := range qs.queues {
reqs += queue.RequestsExecuting
if len(queue.Requests) > 0 || queue.RequestsExecuting > 0 {
reqs += queue.requestsExecuting
if len(queue.requests) > 0 || queue.requestsExecuting > 0 {
activeQueues++
}
}
@ -361,16 +395,16 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue uint64,
// Create a request and enqueue
req := &request{
Decision: lockingpromise.NewLockingPromise(&qs.lock, qs.counter),
ArrivalTime: qs.clock.Now(),
Queue: queue,
decision: lockingpromise.NewLockingPromise(&qs.lock, qs.counter),
arrivalTime: qs.clock.Now(),
queue: queue,
descr1: descr1,
descr2: descr2,
}
if ok := qs.rejectOrEnqueueLocked(req); !ok {
return nil
}
metrics.ObserveQueueLength(qs.config.Name, len(queue.Requests))
metrics.ObserveQueueLength(qs.config.Name, len(queue.requests))
return req
}
@ -381,13 +415,13 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 inte
bestQueueLen := int(math.MaxInt32)
// the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`.
qs.dealer.Deal(hashValue, func(queueIdx int) {
thisLen := len(qs.queues[queueIdx].Requests)
thisLen := len(qs.queues[queueIdx].requests)
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of length %d", qs.config.Name, descr1, descr2, queueIdx, thisLen)
if thisLen < bestQueueLen {
bestQueueIdx, bestQueueLen = queueIdx, thisLen
}
})
klog.V(6).Infof("QS(%s): For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.config.Name, descr1, descr2, bestQueueIdx, bestQueueLen, qs.queues[bestQueueIdx].RequestsExecuting)
klog.V(6).Infof("QS(%s): For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.config.Name, descr1, descr2, bestQueueIdx, bestQueueLen, qs.queues[bestQueueIdx].requestsExecuting)
return bestQueueIdx
}
@ -396,7 +430,7 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 inte
func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue) {
timeoutIdx := -1
now := qs.clock.Now()
reqs := queue.Requests
reqs := queue.requests
// reqs are sorted oldest -> newest
// can short circuit loop (break) if oldest requests are not timing out
// as newer requests also will not have timed out
@ -404,8 +438,8 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue) {
// now - requestWaitLimit = waitLimit
waitLimit := now.Add(-qs.config.RequestWaitLimit)
for i, req := range reqs {
if waitLimit.After(req.ArrivalTime) {
req.Decision.SetLocked(DecisionReject)
if waitLimit.After(req.arrivalTime) {
req.decision.SetLocked(decisionReject)
// get index for timed out requests
timeoutIdx = i
} else {
@ -417,19 +451,19 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue) {
// timeoutIdx + 1 to remove the last timeout req
removeIdx := timeoutIdx + 1
// remove all the timeout requests
queue.Requests = reqs[removeIdx:]
queue.requests = reqs[removeIdx:]
// decrement the # of requestsEnqueued
qs.numRequestsEnqueued -= removeIdx
qs.totRequestsWaiting -= removeIdx
}
}
// rejectOrEnqueueLocked rejects or enqueues the newly arrived request if
// resource criteria isn't met
func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool {
queue := request.Queue
curQueueLength := len(queue.Requests)
queue := request.queue
curQueueLength := len(queue.requests)
// rejects the newly arrived request if resource criteria not met
if qs.getRequestsExecutingLocked() >= qs.config.ConcurrencyLimit &&
if qs.totRequestsExecuting >= qs.config.ConcurrencyLimit &&
curQueueLength >= qs.config.QueueLengthLimit {
return false
}
@ -440,28 +474,17 @@ func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool {
// enqueues a request into an queueSet
func (qs *queueSet) enqueueLocked(request *request) {
queue := request.Queue
if len(queue.Requests) == 0 && queue.RequestsExecuting == 0 {
queue := request.queue
if len(queue.requests) == 0 && queue.requestsExecuting == 0 {
// the queues virtual start time is set to the virtual time.
queue.VirtualStart = qs.virtualTime
queue.virtualStart = qs.virtualTime
if klog.V(6) {
klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), queue.VirtualStart, queue.Index, request.descr1, request.descr2)
klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2)
}
}
queue.Enqueue(request)
qs.numRequestsEnqueued++
metrics.UpdateFlowControlRequestsInQueue(qs.config.Name, qs.numRequestsEnqueued)
}
// getRequestsExecutingLocked gets the # of requests which are "executing":
// this is the # of requests which have been dispatched but have not
// finished (via the finishRequestLocked method invoked after service)
func (qs *queueSet) getRequestsExecutingLocked() int {
total := 0
for _, queue := range qs.queues {
total += queue.RequestsExecuting
}
return total
qs.totRequestsWaiting++
metrics.UpdateFlowControlRequestsInQueue(qs.config.Name, qs.totRequestsWaiting)
}
// dispatchAsMuchAsPossibleLocked runs a loop, as long as there
@ -471,50 +494,70 @@ func (qs *queueSet) getRequestsExecutingLocked() int {
// queue, increment the count of the number executing, and send true
// to the request's channel.
func (qs *queueSet) dispatchAsMuchAsPossibleLocked() {
for qs.numRequestsEnqueued != 0 && qs.getRequestsExecutingLocked() < qs.config.ConcurrencyLimit {
_, ok := qs.dispatchLocked()
for qs.totRequestsWaiting != 0 && qs.totRequestsExecuting < qs.config.ConcurrencyLimit {
ok := qs.dispatchLocked()
if !ok {
break
}
}
}
// dispatchLocked is a convenience method for dequeueing requests that
// require a message to be sent through the requests channel
// this is a required pattern for the QueueSet the queueSet supports
func (qs *queueSet) dispatchLocked() (*request, bool) {
queue := qs.selectQueueLocked()
if queue == nil {
return nil, false
func (qs *queueSet) dispatchSansQueue(descr1, descr2 interface{}) *request {
now := qs.clock.Now()
req := &request{
startTime: now,
arrivalTime: now,
descr1: descr1,
descr2: descr2,
}
request, ok := queue.Dequeue()
if !ok {
return nil, false
qs.totRequestsExecuting++
if klog.V(5) {
klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %#+v %#+v, qs will have %d executing", qs.config.Name, now.Format(nsTimeFmt), qs.virtualTime, descr1, descr2, qs.totRequestsExecuting)
}
request.StartTime = qs.clock.Now()
// request dequeued, service has started
queue.RequestsExecuting++
qs.numRequestsEnqueued--
if klog.V(6) {
klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", qs.config.Name, request.StartTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, queue.Index, queue.VirtualStart, len(queue.Requests), queue.RequestsExecuting)
}
// When a request is dequeued for service -> qs.VirtualStart += G
queue.VirtualStart += qs.estimatedServiceTime
metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, queue.RequestsExecuting)
request.Decision.SetLocked(DecisionExecute)
return request, ok
metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, qs.totRequestsExecuting)
return req
}
/// selectQueueLocked selects the minimum virtualFinish time from the set of queues
// the starting queue is selected via roundrobin
// dispatchLocked uses the Fair Queuing for Server Requests method to
// select a queue and dispatch the oldest request in that queue. The
// return value indicates whether a request was dispatched; this will
// be false when there are no requests waiting in any queue.
func (qs *queueSet) dispatchLocked() bool {
queue := qs.selectQueueLocked()
if queue == nil {
return false
}
request, ok := queue.Dequeue()
if !ok { // This should never happen. But if it does...
return false
}
request.startTime = qs.clock.Now()
// request dequeued, service has started
qs.totRequestsWaiting--
qs.totRequestsExecuting++
queue.requestsExecuting++
if klog.V(6) {
klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", qs.config.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, queue.index, queue.virtualStart, len(queue.requests), queue.requestsExecuting)
}
// When a request is dequeued for service -> qs.virtualStart += G
queue.virtualStart += qs.estimatedServiceTime
metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, qs.totRequestsExecuting)
request.decision.SetLocked(decisionExecute)
return ok
}
// selectQueueLocked examines the queues in round robin order and
// returns the first one of those for which the virtual finish time of
// the oldest waiting request is minimal.
func (qs *queueSet) selectQueueLocked() *queue {
minVirtualFinish := math.Inf(1)
var minQueue *queue
var minIndex int
nq := len(qs.queues)
for range qs.queues {
qs.robinIndex = (qs.robinIndex + 1) % len(qs.queues)
qs.robinIndex = (qs.robinIndex + 1) % nq
queue := qs.queues[qs.robinIndex]
if len(queue.Requests) != 0 {
if len(queue.requests) != 0 {
currentVirtualFinish := queue.GetVirtualFinish(0, qs.estimatedServiceTime)
if currentVirtualFinish < minVirtualFinish {
minVirtualFinish = currentVirtualFinish
@ -546,29 +589,39 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) {
// previously dispatched request has completed it's service. This
// callback updates important state in the queueSet
func (qs *queueSet) finishRequestLocked(r *request) {
S := qs.clock.Since(r.StartTime).Seconds()
qs.totRequestsExecuting--
metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, qs.totRequestsExecuting)
if r.queue == nil {
if klog.V(6) {
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, qs will have %d executing", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, qs.totRequestsExecuting)
}
return
}
S := qs.clock.Since(r.startTime).Seconds()
// When a request finishes being served, and the actual service time was S,
// the queues virtual start time is decremented by G - S.
r.Queue.VirtualStart -= qs.estimatedServiceTime - S
r.queue.virtualStart -= qs.estimatedServiceTime - S
// request has finished, remove from requests executing
r.Queue.RequestsExecuting--
r.queue.requestsExecuting--
if klog.V(6) {
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.Queue.Index, r.Queue.VirtualStart, S, len(r.Queue.Requests), r.Queue.RequestsExecuting)
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index, r.queue.virtualStart, S, len(r.queue.requests), r.queue.requestsExecuting)
}
// Logic to remove quiesced queues
// >= as Index=25 is out of bounds for DesiredNum=25 [0...24]
if r.Queue.Index >= qs.config.DesiredNumQueues &&
len(r.Queue.Requests) == 0 &&
r.Queue.RequestsExecuting == 0 {
qs.queues = removeQueueAndUpdateIndexes(qs.queues, r.Queue.Index)
// If there are more queues than desired and this one has no
// requests then remove it
if len(qs.queues) > qs.config.DesiredNumQueues &&
len(r.queue.requests) == 0 &&
r.queue.requestsExecuting == 0 {
qs.queues = removeQueueAndUpdateIndexes(qs.queues, r.queue.index)
// decrement here to maintain the invariant that (qs.robinIndex+1) % numQueues
// is the index of the next queue after the one last dispatched from
if qs.robinIndex >= r.Queue.Index {
if qs.robinIndex >= r.queue.index {
qs.robinIndex--
}
@ -580,18 +633,18 @@ func (qs *queueSet) finishRequestLocked(r *request) {
}
// removeQueueAndUpdateIndexes uses reslicing to remove an index from a slice
// and then updates the 'Index' field of the queues to be correct
// and then updates the 'index' field of the queues to be correct
func removeQueueAndUpdateIndexes(queues []*queue, index int) []*queue {
keptQueues := append(queues[:index], queues[index+1:]...)
for i := index; i < len(keptQueues); i++ {
keptQueues[i].Index--
keptQueues[i].index--
}
return keptQueues
}
func (qs *queueSet) maybeForkEmptyHandlerLocked() {
if qs.emptyHandler != nil && qs.numRequestsEnqueued == 0 &&
qs.getRequestsExecutingLocked() == 0 {
if qs.emptyHandler != nil && qs.totRequestsWaiting == 0 &&
qs.totRequestsExecuting == 0 {
qs.preCreateOrUnblockGoroutine()
go func(eh fq.EmptyHandler) {
defer runtime.HandleCrash()

View File

@ -197,10 +197,34 @@ func TestDifferentFlows(t *testing.T) {
exerciseQueueSetUniformScenario(t, "DifferentFlows", qs, []uniformClient{
{1001001001, 6, 10, time.Second, time.Second},
{2002002002, 4, 15, time.Second, time.Second / 2},
{2002002002, 5, 15, time.Second, time.Second / 2},
}, time.Second*20, true, true, clk, counter)
}
func TestDifferentFlowsWithoutQueuing(t *testing.T) {
now := time.Now()
clk, counter := clock.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
config := fq.QueueSetConfig{
Name: "TestDifferentFlowsWithoutQueuing",
ConcurrencyLimit: 4,
DesiredNumQueues: 0,
QueueLengthLimit: 6,
HandSize: 3,
RequestWaitLimit: 10 * time.Minute,
}
qs, err := qsf.NewQueueSet(config)
if err != nil {
t.Fatalf("QueueSet creation failed with %v", err)
}
exerciseQueueSetUniformScenario(t, "DifferentFlowsWithoutQueuing", qs, []uniformClient{
{1001001001, 6, 10, time.Second, 57 * time.Millisecond},
{2002002002, 4, 15, time.Second, 750 * time.Millisecond},
}, time.Second*13, false, false, clk, counter)
}
func TestTimeout(t *testing.T) {
now := time.Now()

View File

@ -1,5 +1,5 @@
/*
Copyright 2016 The Kubernetes Authors.
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -19,25 +19,25 @@ package queueset
import (
"time"
"k8s.io/apiserver/pkg/util/promise"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise"
)
// request is a temporary container for "requests" with additional tracking fields
// required for the functionality FQScheduler
// request is a temporary container for "requests" with additional
// tracking fields required for the functionality FQScheduler
type request struct {
Queue *queue
queue *queue
// StartTime is the clock time when the request began executing
StartTime time.Time
// startTime is the real time when the request began executing
startTime time.Time
// Decision gets set to the decision about what to do with this request
Decision promise.LockingMutable
// decision gets set to the decision about what to do with this request
decision promise.LockingMutable
// ArrivalTime is when the request entered this system
ArrivalTime time.Time
// arrivalTime is the real time when the request entered this system
arrivalTime time.Time
// IsWaiting indicates whether the request is presently waiting in a queue
IsWaiting bool
// isWaiting indicates whether the request is presently waiting in a queue
isWaiting bool
// descr1 and descr2 are not used in any logic but they appear in
// log messages
@ -47,31 +47,32 @@ type request struct {
// queue is an array of requests with additional metadata required for
// the FQScheduler
type queue struct {
Requests []*request
requests []*request
// VirtualStart is the virtual time when the oldest request in the
// queue (if there is any) started virtually executing
VirtualStart float64
// virtualStart is the virtual time (virtual seconds since process
// startup) when the oldest request in the queue (if there is any)
// started virtually executing
virtualStart float64
RequestsExecuting int
Index int
requestsExecuting int
index int
}
// Enqueue enqueues a request into the queue
func (q *queue) Enqueue(request *request) {
request.IsWaiting = true
q.Requests = append(q.Requests, request)
request.isWaiting = true
q.requests = append(q.requests, request)
}
// Dequeue dequeues a request from the queue
func (q *queue) Dequeue() (*request, bool) {
if len(q.Requests) == 0 {
if len(q.requests) == 0 {
return nil, false
}
request := q.Requests[0]
q.Requests = q.Requests[1:]
request := q.requests[0]
q.requests = q.requests[1:]
request.IsWaiting = false
request.isWaiting = false
return request, true
}
@ -81,7 +82,7 @@ func (q *queue) GetVirtualFinish(J int, G float64) float64 {
// The virtual finish time of request number J in the queue
// (counting from J=1 for the head) is J * G + (virtual start time).
// counting from J=1 for the head (eg: queue.Requests[0] -> J=1) - J+1
// counting from J=1 for the head (eg: queue.requests[0] -> J=1) - J+1
jg := float64(J+1) * float64(G)
return jg + q.VirtualStart
return jg + q.virtualStart
}