review changes - *Locked updates
Kubernetes-commit: 6619df1798859d49bbb52b1c029533035384824e
This commit is contained in:
		
							parent
							
								
									572fbfc84d
								
							
						
					
					
						commit
						e231e56df2
					
				|  | @ -130,7 +130,7 @@ func (qs *queueSet) SetConfiguration(config fq.QueueSetConfig) error { | ||||||
| 	qs.config = config | 	qs.config = config | ||||||
| 	qs.dealer = dealer | 	qs.dealer = dealer | ||||||
| 
 | 
 | ||||||
| 	qs.dequeueWithChannelAsMuchAsPossible() | 	qs.dequeueWithChannelLockedAsMuchAsPossibleLocked() | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | @ -206,7 +206,7 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64) (tryAnother, exe | ||||||
| 		// technique to pick a queue, dequeue the request at the head of that
 | 		// technique to pick a queue, dequeue the request at the head of that
 | ||||||
| 		// queue, increment the count of the number executing, and send true to
 | 		// queue, increment the count of the number executing, and send true to
 | ||||||
| 		// the request's channel.
 | 		// the request's channel.
 | ||||||
| 		qs.dequeueWithChannelAsMuchAsPossible() | 		qs.dequeueWithChannelLockedAsMuchAsPossibleLocked() | ||||||
| 		return false, false, false, func() {} | 		return false, false, false, func() {} | ||||||
| 	}() | 	}() | ||||||
| 	if shouldReturn { | 	if shouldReturn { | ||||||
|  | @ -390,10 +390,10 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64) int { | ||||||
| 	return bestQueueIdx | 	return bestQueueIdx | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // updateQueueVirtualStartTime updates the virtual start time for a queue
 | // updateQueueVirtualStartTimeLocked updates the virtual start time for a queue
 | ||||||
| // this is done when a new request is enqueued.  For more info see:
 | // this is done when a new request is enqueued.  For more info see:
 | ||||||
| // https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/20190228-priority-and-fairness.md#dispatching
 | // https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/20190228-priority-and-fairness.md#dispatching
 | ||||||
| func (qs *queueSet) updateQueueVirtualStartTime(request *fq.Request, queue *fq.Queue) { | func (qs *queueSet) updateQueueVirtualStartTimeLocked(request *fq.Request, queue *fq.Queue) { | ||||||
| 	// When a request arrives to an empty queue with no requests executing:
 | 	// When a request arrives to an empty queue with no requests executing:
 | ||||||
| 	// len(queue.Requests) == 1 as enqueue has just happened prior (vs  == 0)
 | 	// len(queue.Requests) == 1 as enqueue has just happened prior (vs  == 0)
 | ||||||
| 	if len(queue.Requests) == 1 && queue.RequestsExecuting == 0 { | 	if len(queue.Requests) == 1 && queue.RequestsExecuting == 0 { | ||||||
|  | @ -403,10 +403,10 @@ func (qs *queueSet) updateQueueVirtualStartTime(request *fq.Request, queue *fq.Q | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // enqueues a request into an queueSet
 | // enqueues a request into an queueSet
 | ||||||
| func (qs *queueSet) enqueue(request *fq.Request) { | func (qs *queueSet) enqueueLocked(request *fq.Request) { | ||||||
| 	queue := request.Queue | 	queue := request.Queue | ||||||
| 	queue.Enqueue(request) | 	queue.Enqueue(request) | ||||||
| 	qs.updateQueueVirtualStartTime(request, queue) | 	qs.updateQueueVirtualStartTimeLocked(request, queue) | ||||||
| 	qs.numRequestsEnqueued++ | 	qs.numRequestsEnqueued++ | ||||||
| 
 | 
 | ||||||
| 	metrics.UpdateFlowControlRequestsInQueue(qs.config.Name, qs.numRequestsEnqueued) | 	metrics.UpdateFlowControlRequestsInQueue(qs.config.Name, qs.numRequestsEnqueued) | ||||||
|  | @ -423,13 +423,13 @@ func (qs *queueSet) rejectOrEnqueueLocked(request *fq.Request) bool { | ||||||
| 		return false | 		return false | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	qs.enqueue(request) | 	qs.enqueueLocked(request) | ||||||
| 	return true | 	return true | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // selectQueue selects the minimum virtualFinish time from the set of queues
 | // selectQueueLocked selects the minimum virtualFinish time from the set of queues
 | ||||||
| // the starting queue is selected via roundrobin
 | // the starting queue is selected via roundrobin
 | ||||||
| func (qs *queueSet) selectQueue() *fq.Queue { | func (qs *queueSet) selectQueueLocked() *fq.Queue { | ||||||
| 	minVirtualFinish := math.Inf(1) | 	minVirtualFinish := math.Inf(1) | ||||||
| 	var minQueue *fq.Queue | 	var minQueue *fq.Queue | ||||||
| 	var minIndex int | 	var minIndex int | ||||||
|  | @ -454,7 +454,7 @@ func (qs *queueSet) selectQueue() *fq.Queue { | ||||||
| 
 | 
 | ||||||
| // dequeue dequeues a request from the queueSet
 | // dequeue dequeues a request from the queueSet
 | ||||||
| func (qs *queueSet) dequeueLocked() (*fq.Request, bool) { | func (qs *queueSet) dequeueLocked() (*fq.Request, bool) { | ||||||
| 	queue := qs.selectQueue() | 	queue := qs.selectQueueLocked() | ||||||
| 	if queue == nil { | 	if queue == nil { | ||||||
| 		return nil, false | 		return nil, false | ||||||
| 	} | 	} | ||||||
|  | @ -472,25 +472,25 @@ func (qs *queueSet) dequeueLocked() (*fq.Request, bool) { | ||||||
| 	return request, ok | 	return request, ok | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // dequeueWithChannelAsMuchAsPossible runs a loop, as long as there
 | // dequeueWithChannelLockedAsMuchAsPossibleLocked runs a loop, as long as there
 | ||||||
| // are non-empty queues and the number currently executing is less than the
 | // are non-empty queues and the number currently executing is less than the
 | ||||||
| // assured concurrency value.  The body of the loop uses the fair queuing
 | // assured concurrency value.  The body of the loop uses the fair queuing
 | ||||||
| // technique to pick a queue, dequeue the request at the head of that
 | // technique to pick a queue, dequeue the request at the head of that
 | ||||||
| // queue, increment the count of the number executing, and send true
 | // queue, increment the count of the number executing, and send true
 | ||||||
| // to the request's channel.
 | // to the request's channel.
 | ||||||
| func (qs *queueSet) dequeueWithChannelAsMuchAsPossible() { | func (qs *queueSet) dequeueWithChannelLockedAsMuchAsPossibleLocked() { | ||||||
| 	for qs.numRequestsEnqueued != 0 && qs.getRequestsExecutingLocked() < qs.config.ConcurrencyLimit { | 	for qs.numRequestsEnqueued != 0 && qs.getRequestsExecutingLocked() < qs.config.ConcurrencyLimit { | ||||||
| 		_, ok := qs.dequeueWithChannel() | 		_, ok := qs.dequeueWithChannelLocked() | ||||||
| 		if !ok { | 		if !ok { | ||||||
| 			break | 			break | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // dequeueWithChannel is a convenience method for dequeueing requests that
 | // dequeueWithChannelLocked is a convenience method for dequeueing requests that
 | ||||||
| // require a message to be sent through the requests channel
 | // require a message to be sent through the requests channel
 | ||||||
| // this is a required pattern for the QueueSet the queueSet supports
 | // this is a required pattern for the QueueSet the queueSet supports
 | ||||||
| func (qs *queueSet) dequeueWithChannel() (*fq.Request, bool) { | func (qs *queueSet) dequeueWithChannelLocked() (*fq.Request, bool) { | ||||||
| 	req, ok := qs.dequeueLocked() | 	req, ok := qs.dequeueLocked() | ||||||
| 	if !ok { | 	if !ok { | ||||||
| 		return nil, false | 		return nil, false | ||||||
|  | @ -565,5 +565,5 @@ func (qs *queueSet) finishRequestAndDequeueWithChannelAsMuchAsPossible(req *fq.R | ||||||
| 	defer qs.lock.Unlock() | 	defer qs.lock.Unlock() | ||||||
| 
 | 
 | ||||||
| 	qs.finishRequestLocked(req) | 	qs.finishRequestLocked(req) | ||||||
| 	qs.dequeueWithChannelAsMuchAsPossible() | 	qs.dequeueWithChannelLockedAsMuchAsPossibleLocked() | ||||||
| } | } | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue