Calculate the work in each request just once

Kubernetes-commit: f2c46c8f9d0b360cf913e22c222d9954b4ff9a76
This commit is contained in:
Mike Spreitzer 2021-10-07 17:20:56 -07:00 committed by Kubernetes Publisher
parent ebff4efdaf
commit a5192405d9
6 changed files with 67 additions and 53 deletions

View File

@ -140,11 +140,11 @@ func (l *requestFIFO) Walk(f walkFunc) {
func addToQueueSum(sum *queueSum, req *request) {
sum.InitialSeatsSum += req.InitialSeats()
sum.MaxSeatsSum += req.MaxSeats()
sum.AdditionalSeatSecondsSum += req.AdditionalSeatSeconds()
sum.TotalWorkSum += req.totalWork()
}
func deductFromQueueSum(sum *queueSum, req *request) {
sum.InitialSeatsSum -= req.InitialSeats()
sum.MaxSeatsSum -= req.MaxSeats()
sum.AdditionalSeatSecondsSum -= req.AdditionalSeatSeconds()
sum.TotalWorkSum -= req.totalWork()
}

View File

@ -153,12 +153,13 @@ func TestFIFOWithRemoveIsIdempotent(t *testing.T) {
}
func TestFIFOQueueWorkEstimate(t *testing.T) {
qs := &queueSet{estimatedServiceDuration: time.Second}
list := newRequestFIFO()
update := func(we *queueSum, req *request, multiplier int) {
we.InitialSeatsSum += multiplier * req.InitialSeats()
we.MaxSeatsSum += multiplier * req.MaxSeats()
we.AdditionalSeatSecondsSum += SeatSeconds(multiplier) * req.AdditionalSeatSeconds()
we.TotalWorkSum += SeatSeconds(multiplier) * req.totalWork()
}
assert := func(t *testing.T, want, got *queueSum) {
@ -168,11 +169,11 @@ func TestFIFOQueueWorkEstimate(t *testing.T) {
}
newRequest := func(initialSeats, finalSeats uint, additionalLatency time.Duration) *request {
return &request{workEstimate: fcrequest.WorkEstimate{
return &request{workEstimate: qs.completeWorkEstimate(&fcrequest.WorkEstimate{
InitialSeats: initialSeats,
FinalSeats: finalSeats,
AdditionalLatency: additionalLatency,
}}
})}
}
arrival := []*request{
newRequest(1, 3, time.Second),

View File

@ -325,18 +325,6 @@ func (req *request) InitialSeats() int {
return int(req.workEstimate.InitialSeats)
}
// AdditionalSeatSeconds returns the amount of work in SeatSeconds produced by
// the final seats and the additional latency associated with a request.
func (req *request) AdditionalSeatSeconds() SeatSeconds {
return SeatsTimesDuration(float64(req.workEstimate.FinalSeats), req.workEstimate.AdditionalLatency)
}
// InitialSeatSeconds returns the amount of work in SeatSeconds projected
// by the initial seats for a given estimated service duration.
func (req *request) InitialSeatSeconds(estimatedServiceDuration time.Duration) SeatSeconds {
return SeatsTimesDuration(float64(req.workEstimate.InitialSeats), estimatedServiceDuration)
}
func (req *request) NoteQueued(inQueue bool) {
if req.queueNoteFn != nil {
req.queueNoteFn(inQueue)
@ -488,7 +476,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
descr1: descr1,
descr2: descr2,
queueNoteFn: queueNoteFn,
workEstimate: *workEstimate,
workEstimate: qs.completeWorkEstimate(workEstimate),
}
if ok := qs.rejectOrEnqueueLocked(req); !ok {
return nil
@ -516,7 +504,7 @@ func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interfac
// this is the total amount of work in seat-seconds for requests
// waiting in this queue, we will select the queue with the minimum.
thisQueueSeatSeconds := SeatsTimesDuration(float64(queueSum.InitialSeatsSum), qs.estimatedServiceDuration) + queueSum.AdditionalSeatSecondsSum
thisQueueSeatSeconds := queueSum.TotalWorkSum
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d with sum: %#v and %d seats in use, nextDispatchR=%v", qs.qCfg.Name, descr1, descr2, queueIdx, queueSum, queue.seatsInUse, queue.nextDispatchR)
if thisQueueSeatSeconds < minQueueSeatSeconds {
minQueueSeatSeconds = thisQueueSeatSeconds
@ -634,7 +622,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f
arrivalR: qs.currentR,
descr1: descr1,
descr2: descr2,
workEstimate: *workEstimate,
workEstimate: qs.completeWorkEstimate(workEstimate),
}
qs.totRequestsExecuting++
qs.totSeatsInUse += req.MaxSeats()
@ -683,7 +671,7 @@ func (qs *queueSet) dispatchLocked() bool {
request.workEstimate, queue.index, queue.nextDispatchR, queue.requests.Length(), queue.requestsExecuting, queue.seatsInUse, qs.totSeatsInUse)
}
// When a request is dequeued for service -> qs.virtualStart += G * width
queue.nextDispatchR += request.InitialSeatSeconds(qs.estimatedServiceDuration) + request.AdditionalSeatSeconds()
queue.nextDispatchR += request.totalWork()
qs.boundNextDispatch(queue)
request.decision.Set(decisionExecute)
return ok
@ -734,7 +722,7 @@ func (qs *queueSet) findDispatchQueueLocked() *queue {
estimatedWorkInProgress := SeatsTimesDuration(float64(queue.seatsInUse), qs.estimatedServiceDuration)
dsMin = ssMin(dsMin, queue.nextDispatchR-estimatedWorkInProgress)
dsMax = ssMax(dsMax, queue.nextDispatchR-estimatedWorkInProgress)
currentVirtualFinish := queue.nextDispatchR + oldestWaiting.InitialSeatSeconds(qs.estimatedServiceDuration) + oldestWaiting.AdditionalSeatSeconds()
currentVirtualFinish := queue.nextDispatchR + oldestWaiting.totalWork()
klog.V(11).InfoS("Considering queue to dispatch", "queueSet", qs.qCfg.Name, "queue", qs.robinIndex, "finishR", currentVirtualFinish)
if currentVirtualFinish < minVirtualFinish {
minVirtualFinish = currentVirtualFinish

View File

@ -1114,6 +1114,7 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) {
func TestFindDispatchQueueLocked(t *testing.T) {
const G = 3 * time.Millisecond
qs0 := &queueSet{estimatedServiceDuration: G}
tests := []struct {
name string
robinIndex int
@ -1134,13 +1135,13 @@ func TestFindDispatchQueueLocked(t *testing.T) {
{
nextDispatchR: SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO(
&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 1}},
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})},
),
},
{
nextDispatchR: SeatsTimesDuration(1, 100*time.Second),
requests: newFIFO(
&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 1}},
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})},
),
},
},
@ -1157,7 +1158,7 @@ func TestFindDispatchQueueLocked(t *testing.T) {
{
nextDispatchR: SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO(
&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 1}},
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1})},
),
},
},
@ -1174,13 +1175,13 @@ func TestFindDispatchQueueLocked(t *testing.T) {
{
nextDispatchR: SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO(
&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 50}},
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 50})},
),
},
{
nextDispatchR: SeatsTimesDuration(1, 100*time.Second),
requests: newFIFO(
&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 25}},
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})},
),
},
},
@ -1197,13 +1198,13 @@ func TestFindDispatchQueueLocked(t *testing.T) {
{
nextDispatchR: SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO(
&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 10}},
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 10})},
),
},
{
nextDispatchR: SeatsTimesDuration(1, 100*time.Second),
requests: newFIFO(
&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 25}},
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})},
),
},
},
@ -1220,13 +1221,13 @@ func TestFindDispatchQueueLocked(t *testing.T) {
{
nextDispatchR: SeatsTimesDuration(1, 200*time.Second),
requests: newFIFO(
&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 10}},
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 10})},
),
},
{
nextDispatchR: SeatsTimesDuration(1, 100*time.Second),
requests: newFIFO(
&request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 25}},
&request{workEstimate: qs0.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 25})},
),
},
},
@ -1309,8 +1310,9 @@ func TestFinishRequestLocked(t *testing.T) {
now := time.Now()
clk, _ := testeventclock.NewFake(now, 0, nil)
qs := &queueSet{
clock: clk,
obsPair: newObserverPair(clk),
clock: clk,
estimatedServiceDuration: time.Second,
obsPair: newObserverPair(clk),
}
queue := &queue{
requests: newRequestFIFO(),
@ -1318,7 +1320,7 @@ func TestFinishRequestLocked(t *testing.T) {
r := &request{
qs: qs,
queue: queue,
workEstimate: test.workEstimate,
workEstimate: qs.completeWorkEstimate(&test.workEstimate),
}
qs.totRequestsExecuting = 111
@ -1355,6 +1357,7 @@ func TestFinishRequestLocked(t *testing.T) {
}
func TestRequestSeats(t *testing.T) {
qs := &queueSet{estimatedServiceDuration: time.Second}
tests := []struct {
name string
request *request
@ -1362,17 +1365,17 @@ func TestRequestSeats(t *testing.T) {
}{
{
name: "",
request: &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 3, FinalSeats: 3}},
request: &request{workEstimate: qs.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 3, FinalSeats: 3})},
expected: 3,
},
{
name: "",
request: &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 1, FinalSeats: 3}},
request: &request{workEstimate: qs.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 1, FinalSeats: 3})},
expected: 3,
},
{
name: "",
request: &request{workEstimate: fcrequest.WorkEstimate{InitialSeats: 3, FinalSeats: 1}},
request: &request{workEstimate: qs.completeWorkEstimate(&fcrequest.WorkEstimate{InitialSeats: 3, FinalSeats: 1})},
expected: 3,
},
}
@ -1387,19 +1390,20 @@ func TestRequestSeats(t *testing.T) {
}
}
func TestRequestAdditionalSeatSeconds(t *testing.T) {
func TestRequestWork(t *testing.T) {
qs := &queueSet{estimatedServiceDuration: 2 * time.Second}
request := &request{
workEstimate: fcrequest.WorkEstimate{
workEstimate: qs.completeWorkEstimate(&fcrequest.WorkEstimate{
InitialSeats: 3,
FinalSeats: 5,
AdditionalLatency: 3 * time.Second,
},
FinalSeats: 50,
AdditionalLatency: 70 * time.Second,
}),
}
got := request.AdditionalSeatSeconds()
want := SeatsTimesDuration(5, 3*time.Second)
got := request.totalWork()
want := SeatsTimesDuration(3, 2*time.Second) + SeatsTimesDuration(50, 70*time.Second)
if want != got {
t.Errorf("Expected AdditionalSeatSeconds: %v, but got: %v", want, got)
t.Errorf("Expected totalWork: %v, but got: %v", want, got)
}
}

View File

@ -47,7 +47,7 @@ type request struct {
startTime time.Time
// estimated amount of work of the request
workEstimate fcrequest.WorkEstimate
workEstimate completedWorkEstimate
// decision gets set to a `requestDecision` indicating what to do
// with this request. It gets set exactly once, when the request
@ -78,6 +78,11 @@ type request struct {
removeFromQueueFn removeFromFIFOFunc
}
type completedWorkEstimate struct {
fcrequest.WorkEstimate
totalWork SeatSeconds // initial plus final work
}
// queue is an array of requests with additional metadata required for
// the FQScheduler
type queue struct {
@ -97,8 +102,8 @@ type queue struct {
seatsInUse int
}
// queueSum tracks the sum of initial seats, final seats, and
// additional latency aggregated from all requests in a given queue
// queueSum tracks the sum of initial seats, max seats, and
// totalWork from all requests in a given queue
type queueSum struct {
// InitialSeatsSum is the sum of InitialSeats
// associated with all requests in a given queue.
@ -108,9 +113,23 @@ type queueSum struct {
// associated with all requests in a given queue.
MaxSeatsSum int
// AdditionalSeatSecondsSum is sum of AdditionalSeatsSeconds
// associated with all requests in a given queue.
AdditionalSeatSecondsSum SeatSeconds
// TotalWorkSum is the sum of totalWork of the waiting requests
TotalWorkSum SeatSeconds
}
func (req *request) totalWork() SeatSeconds {
return req.workEstimate.totalWork
}
func (qs *queueSet) completeWorkEstimate(we *fcrequest.WorkEstimate) completedWorkEstimate {
return completedWorkEstimate{
WorkEstimate: *we,
totalWork: qs.computeTotalWork(we),
}
}
func (qs *queueSet) computeTotalWork(we *fcrequest.WorkEstimate) SeatSeconds {
return SeatsTimesDuration(float64(we.InitialSeats), qs.estimatedServiceDuration) + SeatsTimesDuration(float64(we.FinalSeats), we.AdditionalLatency)
}
// Enqueue enqueues a request into the queue and

View File

@ -33,6 +33,8 @@ const (
maximumSeats = 10
)
// WorkEstimate carries three of the four parameters that determine the work in a request.
// The fourth parameter is the duration of the initial phase of execution.
type WorkEstimate struct {
// InitialSeats is the number of seats occupied while the server is
// executing this request.
@ -49,8 +51,8 @@ type WorkEstimate struct {
AdditionalLatency time.Duration
}
// MaxSeats returns the number of seats this request requires, it is the maximum
// of the two, WorkEstimate.InitialSeats and WorkEstimate.FinalSeats.
// MaxSeats returns the maximum number of seats the request occupies over the
// phases of being served.
func (we *WorkEstimate) MaxSeats() int {
if we.InitialSeats >= we.FinalSeats {
return int(we.InitialSeats)