apf: add "width" for request
all requests have a width of 1 to maintain current behavior. Kubernetes-commit: b50507d98bd12503592ea62d2be2aadef49bdf70
This commit is contained in:
		
							parent
							
								
									1f0a859e77
								
							
						
					
					
						commit
						ec22c8bdd8
					
				|  | @ -27,6 +27,7 @@ type QueueSetDump struct { | |||
| 	Queues     []QueueDump | ||||
| 	Waiting    int | ||||
| 	Executing  int | ||||
| 	SeatsInUse int | ||||
| } | ||||
| 
 | ||||
| // QueueDump is an instant dump of one queue in a queue-set.
 | ||||
|  | @ -34,6 +35,7 @@ type QueueDump struct { | |||
| 	Requests          []RequestDump | ||||
| 	VirtualStart      float64 | ||||
| 	ExecutingRequests int | ||||
| 	SeatsInUse        int | ||||
| } | ||||
| 
 | ||||
| // RequestDump is an instant dump of one requests pending in the queue.
 | ||||
|  |  | |||
|  | @ -108,6 +108,10 @@ type queueSet struct { | |||
| 	// sum, over all the queues, of the number of requests executing
 | ||||
| 	// from that queue.
 | ||||
| 	totRequestsExecuting int | ||||
| 
 | ||||
| 	// totSeatsInUse is the number of total "seats" in use by all the
 | ||||
| 	// request(s) that are currently executing in this queueset.
 | ||||
| 	totSeatsInUse int | ||||
| } | ||||
| 
 | ||||
| // NewQueueSetFactory creates a new QueueSetFactory object
 | ||||
|  | @ -233,6 +237,9 @@ const ( | |||
| // because the metrics --- and only the metrics --- track that
 | ||||
| // quantity per FlowSchema.
 | ||||
| func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) { | ||||
| 	// all request(s) have a width of 1, in keeping with the current behavior
 | ||||
| 	width := 1.0 | ||||
| 
 | ||||
| 	qs.lockAndSyncTime() | ||||
| 	defer qs.lock.Unlock() | ||||
| 	var req *request | ||||
|  | @ -241,12 +248,13 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDist | |||
| 	// Step 0:
 | ||||
| 	// Apply only concurrency limit, if zero queues desired
 | ||||
| 	if qs.qCfg.DesiredNumQueues < 1 { | ||||
| 		if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit { | ||||
| 			klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d are executing and the limit is %d", qs.qCfg.Name, fsName, descr1, descr2, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit) | ||||
| 		if qs.totSeatsInUse >= qs.dCfg.ConcurrencyLimit { | ||||
| 			klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d seats are in use (%d are executing) and the limit is %d", | ||||
| 				qs.qCfg.Name, fsName, descr1, descr2, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit) | ||||
| 			metrics.AddReject(ctx, qs.qCfg.Name, fsName, "concurrency-limit") | ||||
| 			return nil, qs.isIdleLocked() | ||||
| 		} | ||||
| 		req = qs.dispatchSansQueueLocked(ctx, flowDistinguisher, fsName, descr1, descr2) | ||||
| 		req = qs.dispatchSansQueueLocked(ctx, width, flowDistinguisher, fsName, descr1, descr2) | ||||
| 		return req, false | ||||
| 	} | ||||
| 
 | ||||
|  | @ -257,7 +265,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDist | |||
| 	// 3) Reject current request if there is not enough concurrency shares and
 | ||||
| 	// we are at max queue length
 | ||||
| 	// 4) If not rejected, create a request and enqueue
 | ||||
| 	req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, hashValue, flowDistinguisher, fsName, descr1, descr2, queueNoteFn) | ||||
| 	req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, width, hashValue, flowDistinguisher, fsName, descr1, descr2, queueNoteFn) | ||||
| 	// req == nil means that the request was rejected - no remaining
 | ||||
| 	// concurrency shares and at max queue length already
 | ||||
| 	if req == nil { | ||||
|  | @ -310,6 +318,11 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDist | |||
| 	return req, false | ||||
| } | ||||
| 
 | ||||
| // Seats returns the number of seats this request requires.
 | ||||
| func (req *request) Seats() int { | ||||
| 	return int(math.Ceil(req.width)) | ||||
| } | ||||
| 
 | ||||
| func (req *request) NoteQueued(inQueue bool) { | ||||
| 	if req.queueNoteFn != nil { | ||||
| 		req.queueNoteFn(inQueue) | ||||
|  | @ -427,7 +440,7 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 { | |||
| // returns the enqueud request on a successful enqueue
 | ||||
| // returns nil in the case that there is no available concurrency or
 | ||||
| // the queuelengthlimit has been reached
 | ||||
| func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request { | ||||
| func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, width float64, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request { | ||||
| 	//	Start with the shuffle sharding, to pick a queue.
 | ||||
| 	queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2) | ||||
| 	queue := qs.queues[queueIdx] | ||||
|  | @ -449,6 +462,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte | |||
| 		descr1:            descr1, | ||||
| 		descr2:            descr2, | ||||
| 		queueNoteFn:       queueNoteFn, | ||||
| 		width:             width, | ||||
| 	} | ||||
| 	if ok := qs.rejectOrEnqueueLocked(req); !ok { | ||||
| 		return nil | ||||
|  | @ -522,7 +536,7 @@ func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool { | |||
| 	queue := request.queue | ||||
| 	curQueueLength := queue.requests.Length() | ||||
| 	// rejects the newly arrived request if resource criteria not met
 | ||||
| 	if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit && | ||||
| 	if qs.totSeatsInUse >= qs.dCfg.ConcurrencyLimit && | ||||
| 		curQueueLength >= qs.qCfg.QueueLengthLimit { | ||||
| 		return false | ||||
| 	} | ||||
|  | @ -556,7 +570,7 @@ func (qs *queueSet) enqueueLocked(request *request) { | |||
| // queue, increment the count of the number executing, and send true
 | ||||
| // to the request's channel.
 | ||||
| func (qs *queueSet) dispatchAsMuchAsPossibleLocked() { | ||||
| 	for qs.totRequestsWaiting != 0 && qs.totRequestsExecuting < qs.dCfg.ConcurrencyLimit { | ||||
| 	for qs.totRequestsWaiting != 0 && qs.totSeatsInUse < qs.dCfg.ConcurrencyLimit { | ||||
| 		ok := qs.dispatchLocked() | ||||
| 		if !ok { | ||||
| 			break | ||||
|  | @ -564,7 +578,7 @@ func (qs *queueSet) dispatchAsMuchAsPossibleLocked() { | |||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request { | ||||
| func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, width float64, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request { | ||||
| 	now := qs.clock.Now() | ||||
| 	req := &request{ | ||||
| 		qs:                qs, | ||||
|  | @ -576,9 +590,11 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, flowDistinguish | |||
| 		arrivalTime:       now, | ||||
| 		descr1:            descr1, | ||||
| 		descr2:            descr2, | ||||
| 		width:             width, | ||||
| 	} | ||||
| 	req.decision.SetLocked(decisionExecute) | ||||
| 	qs.totRequestsExecuting++ | ||||
| 	qs.totSeatsInUse += req.Seats() | ||||
| 	metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1) | ||||
| 	qs.obsPair.RequestsExecuting.Add(1) | ||||
| 	if klog.V(5).Enabled() { | ||||
|  | @ -608,7 +624,9 @@ func (qs *queueSet) dispatchLocked() bool { | |||
| 	// problem because other overhead is also included.
 | ||||
| 	qs.totRequestsWaiting-- | ||||
| 	qs.totRequestsExecuting++ | ||||
| 	qs.totSeatsInUse += request.Seats() | ||||
| 	queue.requestsExecuting++ | ||||
| 	queue.seatsInUse += request.Seats() | ||||
| 	metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1) | ||||
| 	request.NoteQueued(false) | ||||
| 	metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1) | ||||
|  | @ -620,7 +638,7 @@ func (qs *queueSet) dispatchLocked() bool { | |||
| 			queue.index, queue.virtualStart, queue.requests.Length(), queue.requestsExecuting) | ||||
| 	} | ||||
| 	// When a request is dequeued for service -> qs.virtualStart += G
 | ||||
| 	queue.virtualStart += qs.estimatedServiceTime | ||||
| 	queue.virtualStart += qs.estimatedServiceTime * float64(request.Seats()) | ||||
| 	request.decision.SetLocked(decisionExecute) | ||||
| 	return ok | ||||
| } | ||||
|  | @ -682,7 +700,7 @@ func (qs *queueSet) selectQueueLocked() *queue { | |||
| 	// queue here. if the last virtual start time (excluded estimated cost)
 | ||||
| 	// falls behind the global virtual time, we update the latest virtual
 | ||||
| 	// start by: <latest global virtual time> + <previously estimated cost>
 | ||||
| 	previouslyEstimatedServiceTime := float64(minQueue.requestsExecuting) * qs.estimatedServiceTime | ||||
| 	previouslyEstimatedServiceTime := float64(minQueue.seatsInUse) * qs.estimatedServiceTime | ||||
| 	if qs.virtualTime > minQueue.virtualStart-previouslyEstimatedServiceTime { | ||||
| 		// per-queue virtual time should not fall behind the global
 | ||||
| 		minQueue.virtualStart = qs.virtualTime + previouslyEstimatedServiceTime | ||||
|  | @ -710,6 +728,7 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) bool | |||
| func (qs *queueSet) finishRequestLocked(r *request) { | ||||
| 	now := qs.clock.Now() | ||||
| 	qs.totRequestsExecuting-- | ||||
| 	qs.totSeatsInUse -= r.Seats() | ||||
| 	metrics.AddRequestsExecuting(r.ctx, qs.qCfg.Name, r.fsName, -1) | ||||
| 	qs.obsPair.RequestsExecuting.Add(-1) | ||||
| 
 | ||||
|  | @ -724,10 +743,11 @@ func (qs *queueSet) finishRequestLocked(r *request) { | |||
| 
 | ||||
| 	// When a request finishes being served, and the actual service time was S,
 | ||||
| 	// the queue’s virtual start time is decremented by G - S.
 | ||||
| 	r.queue.virtualStart -= qs.estimatedServiceTime - S | ||||
| 	r.queue.virtualStart -= (qs.estimatedServiceTime * float64(r.Seats())) - S | ||||
| 
 | ||||
| 	// request has finished, remove from requests executing
 | ||||
| 	r.queue.requestsExecuting-- | ||||
| 	r.queue.seatsInUse -= r.Seats() | ||||
| 
 | ||||
| 	if klog.V(6).Enabled() { | ||||
| 		klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", | ||||
|  | @ -787,6 +807,7 @@ func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump { | |||
| 		Queues:     make([]debug.QueueDump, len(qs.queues)), | ||||
| 		Waiting:    qs.totRequestsWaiting, | ||||
| 		Executing:  qs.totRequestsExecuting, | ||||
| 		SeatsInUse: qs.totSeatsInUse, | ||||
| 	} | ||||
| 	for i, q := range qs.queues { | ||||
| 		d.Queues[i] = q.dump(includeRequestDetails) | ||||
|  |  | |||
|  | @ -43,6 +43,9 @@ type request struct { | |||
| 	// startTime is the real time when the request began executing
 | ||||
| 	startTime time.Time | ||||
| 
 | ||||
| 	// width of the request
 | ||||
| 	width float64 | ||||
| 
 | ||||
| 	// decision gets set to a `requestDecision` indicating what to do
 | ||||
| 	// with this request.  It gets set exactly once, when the request
 | ||||
| 	// is removed from its queue.  The value will be decisionReject,
 | ||||
|  | @ -80,6 +83,10 @@ type queue struct { | |||
| 
 | ||||
| 	requestsExecuting int | ||||
| 	index             int | ||||
| 
 | ||||
| 	// seatsInUse is the total number of "seats" currently occupied
 | ||||
| 	// by all the requests that are currently executing in this queue.
 | ||||
| 	seatsInUse int | ||||
| } | ||||
| 
 | ||||
| // Enqueue enqueues a request into the queue and
 | ||||
|  | @ -129,5 +136,6 @@ func (q *queue) dump(includeDetails bool) debug.QueueDump { | |||
| 		VirtualStart:      q.virtualStart, | ||||
| 		Requests:          digest, | ||||
| 		ExecutingRequests: q.requestsExecuting, | ||||
| 		SeatsInUse:        q.seatsInUse, | ||||
| 	} | ||||
| } | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue