Use SeatSeconds
Kubernetes-commit: 4b5e1398199282f471d0f332eefeb5c2415bdb01
This commit is contained in:
		
							parent
							
								
									0cb46ec2f7
								
							
						
					
					
						commit
						dc449969cc
					
				|  | @ -76,9 +76,9 @@ type queueSetCompleter struct { | |||
| // not end in "Locked" either acquires the lock or does not care about
 | ||||
| // locking.
 | ||||
| type queueSet struct { | ||||
| 	clock                   eventclock.Interface | ||||
| 	estimatedServiceSeconds float64 | ||||
| 	obsPair                 metrics.TimedObserverPair | ||||
| 	clock                    eventclock.Interface | ||||
| 	estimatedServiceDuration time.Duration | ||||
| 	obsPair                  metrics.TimedObserverPair | ||||
| 
 | ||||
| 	promiseFactory promiseFactory | ||||
| 
 | ||||
|  | @ -102,9 +102,9 @@ type queueSet struct { | |||
| 	// queues are still draining.
 | ||||
| 	queues []*queue | ||||
| 
 | ||||
| 	// virtualTime is the amount of seat-seconds allocated per queue since process startup.
 | ||||
| 	// currentR is the amount of seat-seconds allocated per queue since process startup.
 | ||||
| 	// This is our generalization of the progress meter named R in the original fair queuing work.
 | ||||
| 	virtualTime float64 | ||||
| 	currentR SeatSeconds | ||||
| 
 | ||||
| 	// lastRealTime is what `clock.Now()` yielded when `virtualTime` was last updated
 | ||||
| 	lastRealTime time.Time | ||||
|  | @ -173,12 +173,12 @@ func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet { | |||
| 	qs := qsc.theSet | ||||
| 	if qs == nil { | ||||
| 		qs = &queueSet{ | ||||
| 			clock:                   qsc.factory.clock, | ||||
| 			estimatedServiceSeconds: 0.003, | ||||
| 			obsPair:                 qsc.obsPair, | ||||
| 			qCfg:                    qsc.qCfg, | ||||
| 			virtualTime:             0, | ||||
| 			lastRealTime:            qsc.factory.clock.Now(), | ||||
| 			clock:                    qsc.factory.clock, | ||||
| 			estimatedServiceDuration: 3 * time.Millisecond, | ||||
| 			obsPair:                  qsc.obsPair, | ||||
| 			qCfg:                     qsc.qCfg, | ||||
| 			currentR:                 0, | ||||
| 			lastRealTime:             qsc.factory.clock.Now(), | ||||
| 		} | ||||
| 		qs.promiseFactory = qsc.factory.promiseFactoryFactory(qs) | ||||
| 	} | ||||
|  | @ -407,10 +407,14 @@ func (qs *queueSet) lockAndSyncTime() { | |||
| // lock and before modifying the state of any queue.
 | ||||
| func (qs *queueSet) syncTimeLocked() { | ||||
| 	realNow := qs.clock.Now() | ||||
| 	timeSinceLast := realNow.Sub(qs.lastRealTime).Seconds() | ||||
| 	timeSinceLast := realNow.Sub(qs.lastRealTime) | ||||
| 	qs.lastRealTime = realNow | ||||
| 	qs.virtualTime += timeSinceLast * qs.getVirtualTimeRatioLocked() | ||||
| 	metrics.SetCurrentR(qs.qCfg.Name, qs.virtualTime) | ||||
| 	prevR := qs.currentR | ||||
| 	qs.currentR += SeatsTimesDuration(qs.getVirtualTimeRatioLocked(), timeSinceLast) | ||||
| 	if qs.currentR < prevR { | ||||
| 		klog.ErrorS(errors.New("progress meter wrapped around"), "Wrap", "QS", qs.qCfg.Name, "prevR", prevR, "currentR", qs.currentR) | ||||
| 	} | ||||
| 	metrics.SetCurrentR(qs.qCfg.Name, qs.currentR.ToFloat()) | ||||
| } | ||||
| 
 | ||||
| // getVirtualTimeRatio calculates the rate at which virtual time has
 | ||||
|  | @ -460,7 +464,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte | |||
| 		ctx:               ctx, | ||||
| 		decision:          qs.promiseFactory(nil, ctx.Done(), decisionCancel), | ||||
| 		arrivalTime:       qs.clock.Now(), | ||||
| 		arrivalR:          qs.virtualTime, | ||||
| 		arrivalR:          qs.currentR, | ||||
| 		queue:             queue, | ||||
| 		descr1:            descr1, | ||||
| 		descr2:            descr2, | ||||
|  | @ -495,7 +499,7 @@ func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interfac | |||
| 		// Ideally, this should be based on projected completion time in the
 | ||||
| 		// virtual world of the youngest request in the queue.
 | ||||
| 		thisSeatsSum := waitingSeats + queue.seatsInUse | ||||
| 		klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of %d seats waiting and %d executing, virtualStart=%vss", qs.qCfg.Name, descr1, descr2, queueIdx, waitingSeats, queue.seatsInUse, queue.virtualStart) | ||||
| 		klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of %d seats waiting and %d executing, nextDispatchR=%v", qs.qCfg.Name, descr1, descr2, queueIdx, waitingSeats, queue.seatsInUse, queue.nextDispatchR) | ||||
| 		if thisSeatsSum < bestQueueSeatsSum { | ||||
| 			bestQueueIdx, bestQueueSeatsSum = queueIdx, thisSeatsSum | ||||
| 		} | ||||
|  | @ -503,7 +507,7 @@ func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interfac | |||
| 	} | ||||
| 	if klog.V(6).Enabled() { | ||||
| 		chosenQueue := qs.queues[bestQueueIdx] | ||||
| 		klog.V(6).Infof("QS(%s) at r=%s v=%.9fss: For request %#+v %#+v chose queue %d, had seatSum %d & %d requests executing & virtualStart=%vss", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, chosenQueue.requests.SeatsSum(), chosenQueue.requestsExecuting, chosenQueue.virtualStart) | ||||
| 		klog.V(6).Infof("QS(%s) at t=%s R=%v: For request %#+v %#+v chose queue %d, had seatSum %d & %d requests executing & virtualStart=%v", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.currentR, descr1, descr2, bestQueueIdx, chosenQueue.requests.SeatsSum(), chosenQueue.requestsExecuting, chosenQueue.nextDispatchR) | ||||
| 	} | ||||
| 	return bestQueueIdx | ||||
| } | ||||
|  | @ -571,9 +575,9 @@ func (qs *queueSet) enqueueLocked(request *request) { | |||
| 	now := qs.clock.Now() | ||||
| 	if queue.requests.Length() == 0 && queue.requestsExecuting == 0 { | ||||
| 		// the queue’s start R is set to the virtual time.
 | ||||
| 		queue.virtualStart = qs.virtualTime | ||||
| 		queue.nextDispatchR = qs.currentR | ||||
| 		if klog.V(6).Enabled() { | ||||
| 			klog.Infof("QS(%s) at r=%s v=%.9fss: initialized queue %d start R due to request %#+v %#+v", qs.qCfg.Name, now.Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2) | ||||
| 			klog.Infof("QS(%s) at t=%s R=%v: initialized queue %d start R due to request %#+v %#+v", qs.qCfg.Name, now.Format(nsTimeFmt), queue.nextDispatchR, queue.index, request.descr1, request.descr2) | ||||
| 		} | ||||
| 	} | ||||
| 	queue.Enqueue(request) | ||||
|  | @ -609,7 +613,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f | |||
| 		startTime:         now, | ||||
| 		decision:          qs.promiseFactory(decisionExecute, ctx.Done(), decisionCancel), | ||||
| 		arrivalTime:       now, | ||||
| 		arrivalR:          qs.virtualTime, | ||||
| 		arrivalR:          qs.currentR, | ||||
| 		descr1:            descr1, | ||||
| 		descr2:            descr2, | ||||
| 		workEstimate:      *workEstimate, | ||||
|  | @ -620,7 +624,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f | |||
| 	metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.Seats()) | ||||
| 	qs.obsPair.RequestsExecuting.Add(1) | ||||
| 	if klog.V(5).Enabled() { | ||||
| 		klog.Infof("QS(%s) at r=%s v=%.9fss: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting) | ||||
| 		klog.Infof("QS(%s) at t=%s R=%v: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, fsName, descr1, descr2, qs.totRequestsExecuting) | ||||
| 	} | ||||
| 	return req | ||||
| } | ||||
|  | @ -656,12 +660,12 @@ func (qs *queueSet) dispatchLocked() bool { | |||
| 	qs.obsPair.RequestsWaiting.Add(-1) | ||||
| 	qs.obsPair.RequestsExecuting.Add(1) | ||||
| 	if klog.V(6).Enabled() { | ||||
| 		klog.Infof("QS(%s) at r=%s v=%.9fss: dispatching request %#+v %#+v work %v from queue %d with start R %.9fss, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied", | ||||
| 			qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, | ||||
| 			request.workEstimate, queue.index, queue.virtualStart, queue.requests.Length(), queue.requestsExecuting, queue.seatsInUse, qs.totSeatsInUse) | ||||
| 		klog.Infof("QS(%s) at t=%s R=%v: dispatching request %#+v %#+v work %v from queue %d with start R %v, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied", | ||||
| 			qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.currentR, request.descr1, request.descr2, | ||||
| 			request.workEstimate, queue.index, queue.nextDispatchR, queue.requests.Length(), queue.requestsExecuting, queue.seatsInUse, qs.totSeatsInUse) | ||||
| 	} | ||||
| 	// When a request is dequeued for service -> qs.virtualStart += G * width
 | ||||
| 	queue.virtualStart += qs.estimatedServiceSeconds * float64(request.Seats()) | ||||
| 	queue.nextDispatchR += SeatsTimesDuration(float64(request.Seats()), qs.estimatedServiceDuration) | ||||
| 	qs.boundNextDispatch(queue) | ||||
| 	request.decision.Set(decisionExecute) | ||||
| 	return ok | ||||
|  | @ -694,11 +698,11 @@ func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool { | |||
| // returns the first one of those for which the virtual finish time of
 | ||||
| // the oldest waiting request is minimal.
 | ||||
| func (qs *queueSet) findDispatchQueueLocked() *queue { | ||||
| 	minVirtualFinish := math.Inf(1) | ||||
| 	sMin := math.Inf(1) | ||||
| 	dsMin := math.Inf(1) | ||||
| 	sMax := math.Inf(-1) | ||||
| 	dsMax := math.Inf(-1) | ||||
| 	minVirtualFinish := MaxSeatSeconds | ||||
| 	sMin := MaxSeatSeconds | ||||
| 	dsMin := MaxSeatSeconds | ||||
| 	sMax := MinSeatSeconds | ||||
| 	dsMax := MinSeatSeconds | ||||
| 	var minQueue *queue | ||||
| 	var minIndex int | ||||
| 	nq := len(qs.queues) | ||||
|  | @ -707,12 +711,12 @@ func (qs *queueSet) findDispatchQueueLocked() *queue { | |||
| 		queue := qs.queues[qs.robinIndex] | ||||
| 		oldestWaiting, _ := queue.requests.Peek() | ||||
| 		if oldestWaiting != nil { | ||||
| 			sMin = math.Min(sMin, queue.virtualStart) | ||||
| 			sMax = math.Max(sMax, queue.virtualStart) | ||||
| 			estimatedWorkInProgress := qs.estimatedServiceSeconds * float64(queue.seatsInUse) | ||||
| 			dsMin = math.Min(dsMin, queue.virtualStart-estimatedWorkInProgress) | ||||
| 			dsMax = math.Max(dsMax, queue.virtualStart-estimatedWorkInProgress) | ||||
| 			currentVirtualFinish := queue.virtualStart + qs.estimatedServiceSeconds*float64(oldestWaiting.Seats()) | ||||
| 			sMin = ssMin(sMin, queue.nextDispatchR) | ||||
| 			sMax = ssMax(sMax, queue.nextDispatchR) | ||||
| 			estimatedWorkInProgress := SeatsTimesDuration(float64(queue.seatsInUse), qs.estimatedServiceDuration) | ||||
| 			dsMin = ssMin(dsMin, queue.nextDispatchR-estimatedWorkInProgress) | ||||
| 			dsMax = ssMax(dsMax, queue.nextDispatchR-estimatedWorkInProgress) | ||||
| 			currentVirtualFinish := queue.nextDispatchR + SeatsTimesDuration(float64(oldestWaiting.Seats()), qs.estimatedServiceDuration) | ||||
| 			klog.V(11).InfoS("Considering queue to dispatch", "queueSet", qs.qCfg.Name, "queue", qs.robinIndex, "finishR", currentVirtualFinish) | ||||
| 			if currentVirtualFinish < minVirtualFinish { | ||||
| 				minVirtualFinish = currentVirtualFinish | ||||
|  | @ -743,13 +747,27 @@ func (qs *queueSet) findDispatchQueueLocked() *queue { | |||
| 	// win in the case that the virtual finish times are the same
 | ||||
| 	qs.robinIndex = minIndex | ||||
| 
 | ||||
| 	if minQueue.virtualStart < oldestReqFromMinQueue.arrivalR { | ||||
| 		klog.ErrorS(errors.New("dispatch before arrival"), "Inconceivable!", "QS", qs.qCfg.Name, "queue", minQueue.index, "dispatchR", minQueue.virtualStart, "request", oldestReqFromMinQueue) | ||||
| 	if minQueue.nextDispatchR < oldestReqFromMinQueue.arrivalR { | ||||
| 		klog.ErrorS(errors.New("dispatch before arrival"), "Inconceivable!", "QS", qs.qCfg.Name, "queue", minQueue.index, "dispatchR", minQueue.nextDispatchR, "request", oldestReqFromMinQueue) | ||||
| 	} | ||||
| 	metrics.SetDispatchMetrics(qs.qCfg.Name, qs.virtualTime, minQueue.virtualStart, sMin, sMax, dsMin, dsMax) | ||||
| 	metrics.SetDispatchMetrics(qs.qCfg.Name, qs.currentR.ToFloat(), minQueue.nextDispatchR.ToFloat(), sMin.ToFloat(), sMax.ToFloat(), dsMin.ToFloat(), dsMax.ToFloat()) | ||||
| 	return minQueue | ||||
| } | ||||
| 
 | ||||
| func ssMin(a, b SeatSeconds) SeatSeconds { | ||||
| 	if a > b { | ||||
| 		return b | ||||
| 	} | ||||
| 	return a | ||||
| } | ||||
| 
 | ||||
| func ssMax(a, b SeatSeconds) SeatSeconds { | ||||
| 	if a < b { | ||||
| 		return b | ||||
| 	} | ||||
| 	return a | ||||
| } | ||||
| 
 | ||||
| // finishRequestAndDispatchAsMuchAsPossible is a convenience method
 | ||||
| // which calls finishRequest for a given request and then dispatches
 | ||||
| // as many requests as possible.  This is all of what needs to be done
 | ||||
|  | @ -773,7 +791,7 @@ func (qs *queueSet) finishRequestLocked(r *request) { | |||
| 	metrics.AddRequestsExecuting(r.ctx, qs.qCfg.Name, r.fsName, -1) | ||||
| 	qs.obsPair.RequestsExecuting.Add(-1) | ||||
| 
 | ||||
| 	S := now.Sub(r.startTime).Seconds() | ||||
| 	actualServiceDuration := now.Sub(r.startTime) | ||||
| 
 | ||||
| 	// TODO: for now we keep the logic localized so it is easier to see
 | ||||
| 	//  how the counters are tracked for queueset and queue, in future we
 | ||||
|  | @ -794,11 +812,11 @@ func (qs *queueSet) finishRequestLocked(r *request) { | |||
| 			releaseSeatsLocked() | ||||
| 			if !klog.V(6).Enabled() { | ||||
| 			} else if r.queue != nil { | ||||
| 				klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished all use of %d seats, adjusted queue %d start R to %.9fss due to service time %.9fs, queue will have %d requests, %d seats waiting & %d requests occupying %d seats", | ||||
| 					qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.InitialSeats, r.queue.index, | ||||
| 					r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requests.SeatsSum(), r.queue.requestsExecuting, r.queue.seatsInUse) | ||||
| 				klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished all use of %d seats, adjusted queue %d start R to %v due to service time %.9fs, queue will have %d requests, %d seats waiting & %d requests occupying %d seats", | ||||
| 					qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, r.queue.index, | ||||
| 					r.queue.nextDispatchR, actualServiceDuration.Seconds(), r.queue.requests.Length(), r.queue.requests.SeatsSum(), r.queue.requestsExecuting, r.queue.seatsInUse) | ||||
| 			} else { | ||||
| 				klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished all use of %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.InitialSeats, qs.totRequestsExecuting, qs.totSeatsInUse) | ||||
| 				klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished all use of %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, qs.totRequestsExecuting, qs.totSeatsInUse) | ||||
| 			} | ||||
| 			return | ||||
| 		} | ||||
|  | @ -806,11 +824,11 @@ func (qs *queueSet) finishRequestLocked(r *request) { | |||
| 		additionalLatency := r.workEstimate.AdditionalLatency | ||||
| 		if !klog.V(6).Enabled() { | ||||
| 		} else if r.queue != nil { | ||||
| 			klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, adjusted queue %d start R to %.9fss due to service time %.9fs, queue will have %d requests waiting & %d executing, still has %d seats waiting & %d executing", | ||||
| 				qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.InitialSeats, additionalLatency.Seconds(), r.queue.index, | ||||
| 				r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requestsExecuting, r.queue.requests.SeatsSum(), r.queue.seatsInUse) | ||||
| 			klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, adjusted queue %d start R to %v due to service time %.9fs, queue will have %d requests waiting & %d executing, still has %d seats waiting & %d executing", | ||||
| 				qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, additionalLatency.Seconds(), r.queue.index, | ||||
| 				r.queue.nextDispatchR, actualServiceDuration.Seconds(), r.queue.requests.Length(), r.queue.requestsExecuting, r.queue.requests.SeatsSum(), r.queue.seatsInUse) | ||||
| 		} else { | ||||
| 			klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.InitialSeats, additionalLatency.Seconds(), qs.totRequestsExecuting, qs.totSeatsInUse) | ||||
| 			klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, additionalLatency.Seconds(), qs.totRequestsExecuting, qs.totSeatsInUse) | ||||
| 		} | ||||
| 		// EventAfterDuration will execute the event func in a new goroutine,
 | ||||
| 		// so the seats allocated to this request will be released after
 | ||||
|  | @ -823,11 +841,11 @@ func (qs *queueSet) finishRequestLocked(r *request) { | |||
| 			releaseSeatsLocked() | ||||
| 			if !klog.V(6).Enabled() { | ||||
| 			} else if r.queue != nil { | ||||
| 				klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished lingering on %d seats, queue %d will have %d requests, %d seats waiting & %d requests occupying %d seats", | ||||
| 					qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.InitialSeats, r.queue.index, | ||||
| 				klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished lingering on %d seats, queue %d will have %d requests, %d seats waiting & %d requests occupying %d seats", | ||||
| 					qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, r.queue.index, | ||||
| 					r.queue.requests.Length(), r.queue.requests.SeatsSum(), r.queue.requestsExecuting, r.queue.seatsInUse) | ||||
| 			} else { | ||||
| 				klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished lingering on %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.InitialSeats, qs.totRequestsExecuting, qs.totSeatsInUse) | ||||
| 				klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished lingering on %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, qs.totRequestsExecuting, qs.totSeatsInUse) | ||||
| 			} | ||||
| 			qs.dispatchAsMuchAsPossibleLocked() | ||||
| 		}, additionalLatency) | ||||
|  | @ -839,7 +857,7 @@ func (qs *queueSet) finishRequestLocked(r *request) { | |||
| 
 | ||||
| 		// When a request finishes being served, and the actual service time was S,
 | ||||
| 		// the queue’s start R is decremented by (G - S)*width.
 | ||||
| 		r.queue.virtualStart -= (qs.estimatedServiceSeconds - S) * float64(r.Seats()) | ||||
| 		r.queue.nextDispatchR -= SeatsTimesDuration(float64(r.Seats()), qs.estimatedServiceDuration-actualServiceDuration) | ||||
| 		qs.boundNextDispatch(r.queue) | ||||
| 	} | ||||
| } | ||||
|  | @ -857,11 +875,11 @@ func (qs *queueSet) boundNextDispatch(queue *queue) { | |||
| 		return | ||||
| 	} | ||||
| 	var virtualStartBound = oldestReqFromMinQueue.arrivalR | ||||
| 	if queue.virtualStart < virtualStartBound { | ||||
| 	if queue.nextDispatchR < virtualStartBound { | ||||
| 		if klog.V(4).Enabled() { | ||||
| 			klog.InfoS("AntiWindup tweaked queue", "QS", qs.qCfg.Name, "queue", queue.index, "time", qs.clock.Now().Format(nsTimeFmt), "requestDescr1", oldestReqFromMinQueue.descr1, "requestDescr2", oldestReqFromMinQueue.descr2, "newVirtualStart", virtualStartBound, "deltaVirtualStart", (virtualStartBound - queue.virtualStart)) | ||||
| 			klog.InfoS("AntiWindup tweaked queue", "QS", qs.qCfg.Name, "queue", queue.index, "time", qs.clock.Now().Format(nsTimeFmt), "requestDescr1", oldestReqFromMinQueue.descr1, "requestDescr2", oldestReqFromMinQueue.descr2, "newVirtualStart", virtualStartBound, "deltaVirtualStart", (virtualStartBound - queue.nextDispatchR)) | ||||
| 		} | ||||
| 		queue.virtualStart = virtualStartBound | ||||
| 		queue.nextDispatchR = virtualStartBound | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
|  |  | |||
|  | @ -247,7 +247,7 @@ type uniformScenarioThread struct { | |||
| } | ||||
| 
 | ||||
| func (ust *uniformScenarioThread) start() { | ||||
| 	initialDelay := time.Duration(11*ust.j + 2*ust.i) | ||||
| 	initialDelay := time.Duration(90*ust.j + 20*ust.i) | ||||
| 	if ust.uc.split && ust.j >= ust.uc.nThreads/2 { | ||||
| 		initialDelay += ust.uss.evalDuration / 2 | ||||
| 		ust.nCalls = ust.nCalls / 2 | ||||
|  | @ -601,7 +601,7 @@ func TestUniformFlowsHandSize3(t *testing.T) { | |||
| 		concurrencyLimit:       4, | ||||
| 		evalDuration:           time.Second * 60, | ||||
| 		expectedFair:           []bool{true}, | ||||
| 		expectedFairnessMargin: []float64{0.01}, | ||||
| 		expectedFairnessMargin: []float64{0.03}, | ||||
| 		expectAllRequests:      true, | ||||
| 		evalInqueueMetrics:     true, | ||||
| 		evalExecutingMetrics:   true, | ||||
|  | @ -647,6 +647,46 @@ func TestDifferentFlowsExpectEqual(t *testing.T) { | |||
| 	}.exercise(t) | ||||
| } | ||||
| 
 | ||||
| // TestSeatSecondsRollover demonstrates that SeatSeconds overflow can cause bad stuff to happen.
 | ||||
| func TestSeatSecondsRollover(t *testing.T) { | ||||
| 	metrics.Register() | ||||
| 	now := time.Now() | ||||
| 
 | ||||
| 	const Quarter = 91 * 24 * time.Hour | ||||
| 
 | ||||
| 	clk, counter := testeventclock.NewFake(now, 0, nil) | ||||
| 	qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter)) | ||||
| 	qCfg := fq.QueuingConfig{ | ||||
| 		Name:             "TestSeatSecondsRollover", | ||||
| 		DesiredNumQueues: 9, | ||||
| 		QueueLengthLimit: 8, | ||||
| 		HandSize:         1, | ||||
| 		RequestWaitLimit: 40 * Quarter, | ||||
| 	} | ||||
| 	qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 2000}) | ||||
| 
 | ||||
| 	uniformScenario{name: qCfg.Name, | ||||
| 		qs: qs, | ||||
| 		clients: []uniformClient{ | ||||
| 			newUniformClient(1001001001, 8, 20, Quarter, Quarter).seats(500), | ||||
| 			newUniformClient(2002002002, 7, 30, Quarter, Quarter/2).seats(500), | ||||
| 		}, | ||||
| 		concurrencyLimit:       2000, | ||||
| 		evalDuration:           Quarter * 40, | ||||
| 		expectedFair:           []bool{false}, | ||||
| 		expectedFairnessMargin: []float64{0.01}, | ||||
| 		expectAllRequests:      true, | ||||
| 		evalInqueueMetrics:     true, | ||||
| 		evalExecutingMetrics:   true, | ||||
| 		clk:                    clk, | ||||
| 		counter:                counter, | ||||
| 	}.exercise(t) | ||||
| } | ||||
| 
 | ||||
| func TestDifferentFlowsExpectUnequal(t *testing.T) { | ||||
| 	metrics.Register() | ||||
| 	now := time.Now() | ||||
|  | @ -1073,7 +1113,7 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) { | |||
| } | ||||
| 
 | ||||
| func TestFindDispatchQueueLocked(t *testing.T) { | ||||
| 	var G float64 = 0.003 | ||||
| 	const G = 3 * time.Millisecond | ||||
| 	tests := []struct { | ||||
| 		name                    string | ||||
| 		robinIndex              int | ||||
|  | @ -1092,13 +1132,13 @@ func TestFindDispatchQueueLocked(t *testing.T) { | |||
| 			robinIndex:       -1, | ||||
| 			queues: []*queue{ | ||||
| 				{ | ||||
| 					virtualStart: 200, | ||||
| 					nextDispatchR: SeatsTimesDuration(1, 200*time.Second), | ||||
| 					requests: newFIFO( | ||||
| 						&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 1}}, | ||||
| 					), | ||||
| 				}, | ||||
| 				{ | ||||
| 					virtualStart: 100, | ||||
| 					nextDispatchR: SeatsTimesDuration(1, 100*time.Second), | ||||
| 					requests: newFIFO( | ||||
| 						&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 1}}, | ||||
| 					), | ||||
|  | @ -1115,7 +1155,7 @@ func TestFindDispatchQueueLocked(t *testing.T) { | |||
| 			robinIndex:       -1, | ||||
| 			queues: []*queue{ | ||||
| 				{ | ||||
| 					virtualStart: 200, | ||||
| 					nextDispatchR: SeatsTimesDuration(1, 200*time.Second), | ||||
| 					requests: newFIFO( | ||||
| 						&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 1}}, | ||||
| 					), | ||||
|  | @ -1132,13 +1172,13 @@ func TestFindDispatchQueueLocked(t *testing.T) { | |||
| 			robinIndex:       -1, | ||||
| 			queues: []*queue{ | ||||
| 				{ | ||||
| 					virtualStart: 200, | ||||
| 					nextDispatchR: SeatsTimesDuration(1, 200*time.Second), | ||||
| 					requests: newFIFO( | ||||
| 						&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 50}}, | ||||
| 					), | ||||
| 				}, | ||||
| 				{ | ||||
| 					virtualStart: 100, | ||||
| 					nextDispatchR: SeatsTimesDuration(1, 100*time.Second), | ||||
| 					requests: newFIFO( | ||||
| 						&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 25}}, | ||||
| 					), | ||||
|  | @ -1155,13 +1195,13 @@ func TestFindDispatchQueueLocked(t *testing.T) { | |||
| 			robinIndex:       -1, | ||||
| 			queues: []*queue{ | ||||
| 				{ | ||||
| 					virtualStart: 200, | ||||
| 					nextDispatchR: SeatsTimesDuration(1, 200*time.Second), | ||||
| 					requests: newFIFO( | ||||
| 						&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 10}}, | ||||
| 					), | ||||
| 				}, | ||||
| 				{ | ||||
| 					virtualStart: 100, | ||||
| 					nextDispatchR: SeatsTimesDuration(1, 100*time.Second), | ||||
| 					requests: newFIFO( | ||||
| 						&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 25}}, | ||||
| 					), | ||||
|  | @ -1178,13 +1218,13 @@ func TestFindDispatchQueueLocked(t *testing.T) { | |||
| 			robinIndex:       -1, | ||||
| 			queues: []*queue{ | ||||
| 				{ | ||||
| 					virtualStart: 200, | ||||
| 					nextDispatchR: SeatsTimesDuration(1, 200*time.Second), | ||||
| 					requests: newFIFO( | ||||
| 						&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 10}}, | ||||
| 					), | ||||
| 				}, | ||||
| 				{ | ||||
| 					virtualStart: 100, | ||||
| 					nextDispatchR: SeatsTimesDuration(1, 100*time.Second), | ||||
| 					requests: newFIFO( | ||||
| 						&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 25}}, | ||||
| 					), | ||||
|  | @ -1204,10 +1244,10 @@ func TestFindDispatchQueueLocked(t *testing.T) { | |||
| 	for _, test := range tests { | ||||
| 		t.Run(test.name, func(t *testing.T) { | ||||
| 			qs := &queueSet{ | ||||
| 				estimatedServiceSeconds: G, | ||||
| 				robinIndex:              test.robinIndex, | ||||
| 				totSeatsInUse:           test.totSeatsInUse, | ||||
| 				qCfg:                    fq.QueuingConfig{Name: "TestSelectQueueLocked/" + test.name}, | ||||
| 				estimatedServiceDuration: G, | ||||
| 				robinIndex:               test.robinIndex, | ||||
| 				totSeatsInUse:            test.totSeatsInUse, | ||||
| 				qCfg:                     fq.QueuingConfig{Name: "TestSelectQueueLocked/" + test.name}, | ||||
| 				dCfg: fq.DispatchingConfig{ | ||||
| 					ConcurrencyLimit: test.concurrencyLimit, | ||||
| 				}, | ||||
|  |  | |||
|  | @ -62,7 +62,7 @@ type request struct { | |||
| 	arrivalTime time.Time | ||||
| 
 | ||||
| 	// arrivalR is R(arrivalTime).  R is, confusingly, also called "virtual time".
 | ||||
| 	arrivalR float64 | ||||
| 	arrivalR SeatSeconds | ||||
| 
 | ||||
| 	// descr1 and descr2 are not used in any logic but they appear in
 | ||||
| 	// log messages
 | ||||
|  | @ -84,9 +84,9 @@ type queue struct { | |||
| 	// The requests not yet executing in the real world are stored in a FIFO list.
 | ||||
| 	requests fifo | ||||
| 
 | ||||
| 	// virtualStart is the "virtual time" (R progress meter reading) at
 | ||||
| 	// nextDispatchR is the R progress meter reading at
 | ||||
| 	// which the next request will be dispatched in the virtual world.
 | ||||
| 	virtualStart float64 | ||||
| 	nextDispatchR SeatSeconds | ||||
| 
 | ||||
| 	// requestsExecuting is the count in the real world
 | ||||
| 	requestsExecuting int | ||||
|  | @ -130,7 +130,7 @@ func (q *queue) dump(includeDetails bool) debug.QueueDump { | |||
| 		return true | ||||
| 	}) | ||||
| 	return debug.QueueDump{ | ||||
| 		VirtualStart:      q.virtualStart, | ||||
| 		VirtualStart:      q.nextDispatchR.ToFloat(), // TODO: change QueueDump to use SeatSeconds
 | ||||
| 		Requests:          digest, | ||||
| 		ExecutingRequests: q.requestsExecuting, | ||||
| 		SeatsInUse:        q.seatsInUse, | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue