Update queueset_test.go for FinalSeats
Track the introduction of FinalSeats. Give up on calculating expected results for tests with added latency, because I did not find an easy and obvious way to do it. Kubernetes-commit: 0fc595e03360ba7fc4c3e251d4b41f39172aca72
This commit is contained in:
parent
518b315d81
commit
d69d77c659
|
@ -111,17 +111,18 @@ type uniformClient struct {
|
|||
// padDuration is additional time during which this request occupies its seats.
|
||||
// This comes at the end of execution, after the reply has been released toward
|
||||
// the client.
|
||||
// The evaluation code below can only handle two cases:
|
||||
// - this padding always keeps another request out of the seats, or
|
||||
// - this padding never keeps another request out of the seats.
|
||||
// Set the `padConstrains` field of the scenario accordingly.
|
||||
// The evaluation code below does not take this into account.
|
||||
// In cases where `padDuration` makes a difference,
|
||||
// set the `expectedAverages` field of `uniformScenario`.
|
||||
padDuration time.Duration
|
||||
// When true indicates that only half the specified number of
|
||||
// threads should run during the first half of the evaluation
|
||||
// period
|
||||
split bool
|
||||
// width is the number of seats this request occupies while executing
|
||||
width uint
|
||||
// initialSeats is the number of seats this request occupies in the first phase of execution
|
||||
initialSeats uint
|
||||
// finalSeats is the number occupied during the second phase of execution
|
||||
finalSeats uint
|
||||
}
|
||||
|
||||
func newUniformClient(hash uint64, nThreads, nCalls int, execDuration, thinkDuration time.Duration) uniformClient {
|
||||
|
@ -131,7 +132,8 @@ func newUniformClient(hash uint64, nThreads, nCalls int, execDuration, thinkDura
|
|||
nCalls: nCalls,
|
||||
execDuration: execDuration,
|
||||
thinkDuration: thinkDuration,
|
||||
width: 1,
|
||||
initialSeats: 1,
|
||||
finalSeats: 1,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -140,12 +142,13 @@ func (uc uniformClient) setSplit() uniformClient {
|
|||
return uc
|
||||
}
|
||||
|
||||
func (uc uniformClient) seats(width uint) uniformClient {
|
||||
uc.width = width
|
||||
func (uc uniformClient) setInitWidth(seats uint) uniformClient {
|
||||
uc.initialSeats = seats
|
||||
return uc
|
||||
}
|
||||
|
||||
func (uc uniformClient) pad(duration time.Duration) uniformClient {
|
||||
func (uc uniformClient) pad(finalSeats int, duration time.Duration) uniformClient {
|
||||
uc.finalSeats = uint(finalSeats)
|
||||
uc.padDuration = duration
|
||||
return uc
|
||||
}
|
||||
|
@ -163,8 +166,7 @@ func (uc uniformClient) pad(duration time.Duration) uniformClient {
|
|||
// fair in the respective halves of a split scenario;
|
||||
// in a non-split scenario this is a singleton with one expectation.
|
||||
// expectAllRequests indicates whether all requests are expected to get dispatched.
|
||||
// padConstrains indicates whether the execution duration padding, if any,
|
||||
// is expected to hold up dispatching.
|
||||
// expectedAverages, if provided, replaces the normal calculation of expected results.
|
||||
type uniformScenario struct {
|
||||
name string
|
||||
qs fq.QueueSet
|
||||
|
@ -178,7 +180,7 @@ type uniformScenario struct {
|
|||
rejectReason string
|
||||
clk *testeventclock.Fake
|
||||
counter counter.GoRoutineCounter
|
||||
padConstrains bool
|
||||
expectedAverages []float64
|
||||
expectedEpochAdvances int
|
||||
}
|
||||
|
||||
|
@ -267,7 +269,7 @@ func (ust *uniformScenarioThread) callK(k int) {
|
|||
if k >= ust.nCalls {
|
||||
return
|
||||
}
|
||||
req, idle := ust.uss.qs.StartRequest(context.Background(), &fcrequest.WorkEstimate{InitialSeats: ust.uc.width, AdditionalLatency: ust.uc.padDuration}, ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil)
|
||||
req, idle := ust.uss.qs.StartRequest(context.Background(), &fcrequest.WorkEstimate{InitialSeats: ust.uc.initialSeats, FinalSeats: ust.uc.finalSeats, AdditionalLatency: ust.uc.padDuration}, ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil)
|
||||
ust.uss.t.Logf("%s: %d, %d, %d got req=%p, idle=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, req, idle)
|
||||
if req == nil {
|
||||
atomic.AddUint64(&ust.uss.failedCount, 1)
|
||||
|
@ -283,11 +285,11 @@ func (ust *uniformScenarioThread) callK(k int) {
|
|||
executed = true
|
||||
execStart := ust.uss.clk.Now()
|
||||
atomic.AddInt32(&ust.uss.executions[ust.i], 1)
|
||||
ust.igr.Add(float64(ust.uc.width))
|
||||
ust.uss.t.Logf("%s: %d, %d, %d executing; seats=%d", execStart.Format(nsTimeFmt), ust.i, ust.j, k, ust.uc.width)
|
||||
ust.igr.Add(float64(ust.uc.initialSeats))
|
||||
ust.uss.t.Logf("%s: %d, %d, %d executing; width1=%d", execStart.Format(nsTimeFmt), ust.i, ust.j, k, ust.uc.initialSeats)
|
||||
ust.uss.clk.EventAfterDuration(ust.genCallK(k+1), ust.uc.execDuration+ust.uc.thinkDuration)
|
||||
ust.uss.clk.Sleep(ust.uc.execDuration)
|
||||
ust.igr.Add(-float64(ust.uc.width))
|
||||
ust.igr.Add(-float64(ust.uc.initialSeats))
|
||||
returnTime = ust.uss.clk.Now()
|
||||
})
|
||||
now := ust.uss.clk.Now()
|
||||
|
@ -304,9 +306,9 @@ func (uss *uniformScenarioState) evalTo(lim time.Time, last, expectFair bool, ma
|
|||
uss.clk.Run(&lim)
|
||||
uss.clk.SetTime(lim)
|
||||
if uss.doSplit && !last {
|
||||
uss.t.Logf("%s: End of first half", uss.clk.Now().Format(nsTimeFmt))
|
||||
uss.t.Logf("%s: End of first half of scenario %q", uss.clk.Now().Format(nsTimeFmt), uss.name)
|
||||
} else {
|
||||
uss.t.Logf("%s: End", uss.clk.Now().Format(nsTimeFmt))
|
||||
uss.t.Logf("%s: End of scenario %q", uss.clk.Now().Format(nsTimeFmt), uss.name)
|
||||
}
|
||||
demands := make([]float64, len(uss.clients))
|
||||
averages := make([]float64, len(uss.clients))
|
||||
|
@ -316,13 +318,13 @@ func (uss *uniformScenarioState) evalTo(lim time.Time, last, expectFair bool, ma
|
|||
nThreads = nThreads / 2
|
||||
}
|
||||
sep := uc.thinkDuration
|
||||
if uss.padConstrains && uc.padDuration > sep {
|
||||
sep = uc.padDuration
|
||||
}
|
||||
demands[i] = float64(nThreads) * float64(uc.width) * float64(uc.execDuration) / float64(sep+uc.execDuration)
|
||||
demands[i] = float64(nThreads) * float64(uc.initialSeats) * float64(uc.execDuration) / float64(sep+uc.execDuration)
|
||||
averages[i] = uss.integrators[i].Reset().Average
|
||||
}
|
||||
fairAverages := fairAlloc(demands, float64(uss.concurrencyLimit))
|
||||
fairAverages := uss.expectedAverages
|
||||
if fairAverages == nil {
|
||||
fairAverages = fairAlloc(demands, float64(uss.concurrencyLimit))
|
||||
}
|
||||
for i := range uss.clients {
|
||||
expectedAverage := fairAverages[i]
|
||||
var gotFair bool
|
||||
|
@ -503,50 +505,74 @@ func TestBaseline(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestSeparations(t *testing.T) {
|
||||
for _, seps := range []struct{ think, pad time.Duration }{
|
||||
{think: time.Second, pad: 0},
|
||||
{think: 0, pad: time.Second},
|
||||
{think: time.Second, pad: time.Second / 2},
|
||||
{think: time.Second / 2, pad: time.Second},
|
||||
flts := func(avgs ...float64) []float64 { return avgs }
|
||||
for _, seps := range []struct {
|
||||
think, pad time.Duration
|
||||
finalSeats, conc, nClients int
|
||||
exp []float64 // override expected results
|
||||
}{
|
||||
{think: time.Second, pad: 0, finalSeats: 1, conc: 1, nClients: 1},
|
||||
{think: time.Second, pad: 0, finalSeats: 1, conc: 2, nClients: 1},
|
||||
{think: time.Second, pad: 0, finalSeats: 2, conc: 2, nClients: 1},
|
||||
{think: time.Second, pad: 0, finalSeats: 1, conc: 1, nClients: 2},
|
||||
{think: time.Second, pad: 0, finalSeats: 1, conc: 2, nClients: 2},
|
||||
{think: time.Second, pad: 0, finalSeats: 2, conc: 2, nClients: 2},
|
||||
{think: 0, pad: time.Second, finalSeats: 1, conc: 1, nClients: 1, exp: flts(0.5)},
|
||||
{think: 0, pad: time.Second, finalSeats: 1, conc: 2, nClients: 1},
|
||||
{think: 0, pad: time.Second, finalSeats: 2, conc: 2, nClients: 1, exp: flts(0.5)},
|
||||
{think: 0, pad: time.Second, finalSeats: 1, conc: 1, nClients: 2, exp: flts(0.25, 0.25)},
|
||||
{think: 0, pad: time.Second, finalSeats: 1, conc: 2, nClients: 2, exp: flts(0.5, 0.5)},
|
||||
{think: 0, pad: time.Second, finalSeats: 2, conc: 2, nClients: 2, exp: flts(0.25, 0.25)},
|
||||
{think: time.Second, pad: time.Second / 2, finalSeats: 1, conc: 1, nClients: 1},
|
||||
{think: time.Second, pad: time.Second / 2, finalSeats: 1, conc: 2, nClients: 1},
|
||||
{think: time.Second, pad: time.Second / 2, finalSeats: 2, conc: 2, nClients: 1},
|
||||
{think: time.Second, pad: time.Second / 2, finalSeats: 1, conc: 1, nClients: 2, exp: flts(1.0/3, 1.0/3)},
|
||||
{think: time.Second, pad: time.Second / 2, finalSeats: 1, conc: 2, nClients: 2},
|
||||
{think: time.Second, pad: time.Second / 2, finalSeats: 2, conc: 2, nClients: 2, exp: flts(1.0/3, 1.0/3)},
|
||||
{think: time.Second / 2, pad: time.Second, finalSeats: 1, conc: 1, nClients: 1, exp: flts(0.5)},
|
||||
{think: time.Second / 2, pad: time.Second, finalSeats: 1, conc: 2, nClients: 1},
|
||||
{think: time.Second / 2, pad: time.Second, finalSeats: 2, conc: 2, nClients: 1, exp: flts(0.5)},
|
||||
{think: time.Second / 2, pad: time.Second, finalSeats: 1, conc: 1, nClients: 2, exp: flts(0.25, 0.25)},
|
||||
{think: time.Second / 2, pad: time.Second, finalSeats: 1, conc: 2, nClients: 2, exp: flts(0.5, 0.5)},
|
||||
{think: time.Second / 2, pad: time.Second, finalSeats: 2, conc: 2, nClients: 2, exp: flts(0.25, 0.25)},
|
||||
} {
|
||||
for conc := 1; conc <= 2; conc++ {
|
||||
caseName := fmt.Sprintf("seps%v,%v,%v", seps.think, seps.pad, conc)
|
||||
t.Run(caseName, func(t *testing.T) {
|
||||
metrics.Register()
|
||||
now := time.Now()
|
||||
caseName := fmt.Sprintf("think=%v,finalSeats=%d,pad=%v,nClients=%d,conc=%d", seps.think, seps.finalSeats, seps.pad, seps.nClients, seps.conc)
|
||||
t.Run(caseName, func(t *testing.T) {
|
||||
metrics.Register()
|
||||
now := time.Now()
|
||||
|
||||
clk, counter := testeventclock.NewFake(now, 0, nil)
|
||||
qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter))
|
||||
qCfg := fq.QueuingConfig{
|
||||
Name: caseName,
|
||||
DesiredNumQueues: 9,
|
||||
QueueLengthLimit: 8,
|
||||
HandSize: 3,
|
||||
RequestWaitLimit: 10 * time.Minute,
|
||||
}
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: conc})
|
||||
uniformScenario{name: qCfg.Name,
|
||||
qs: qs,
|
||||
clients: []uniformClient{
|
||||
newUniformClient(1001001001, 1, 19, time.Second, seps.think).pad(seps.pad),
|
||||
},
|
||||
concurrencyLimit: conc,
|
||||
evalDuration: time.Second * 18, // multiple of every period involved, so that margin can be 0 below
|
||||
expectedFair: []bool{true},
|
||||
expectedFairnessMargin: []float64{0},
|
||||
expectAllRequests: true,
|
||||
evalInqueueMetrics: true,
|
||||
evalExecutingMetrics: true,
|
||||
clk: clk,
|
||||
counter: counter,
|
||||
padConstrains: conc == 1,
|
||||
}.exercise(t)
|
||||
})
|
||||
}
|
||||
clk, counter := testeventclock.NewFake(now, 0, nil)
|
||||
qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter))
|
||||
qCfg := fq.QueuingConfig{
|
||||
Name: "TestSeparations/" + caseName,
|
||||
DesiredNumQueues: 9,
|
||||
QueueLengthLimit: 8,
|
||||
HandSize: 3,
|
||||
RequestWaitLimit: 10 * time.Minute,
|
||||
}
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: seps.conc})
|
||||
uniformScenario{name: qCfg.Name,
|
||||
qs: qs,
|
||||
clients: []uniformClient{
|
||||
newUniformClient(1001001001, 1, 25, time.Second, seps.think).pad(seps.finalSeats, seps.pad),
|
||||
newUniformClient(2002002002, 1, 25, time.Second, seps.think).pad(seps.finalSeats, seps.pad),
|
||||
}[:seps.nClients],
|
||||
concurrencyLimit: seps.conc,
|
||||
evalDuration: time.Second * 24, // multiple of every period involved, so that margin can be 0 below
|
||||
expectedFair: []bool{true},
|
||||
expectedFairnessMargin: []float64{0},
|
||||
expectAllRequests: true,
|
||||
evalInqueueMetrics: true,
|
||||
evalExecutingMetrics: true,
|
||||
clk: clk,
|
||||
counter: counter,
|
||||
expectedAverages: seps.exp,
|
||||
}.exercise(t)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -685,8 +711,8 @@ func TestSeatSecondsRollover(t *testing.T) {
|
|||
uniformScenario{name: qCfg.Name,
|
||||
qs: qs,
|
||||
clients: []uniformClient{
|
||||
newUniformClient(1001001001, 8, 20, Quarter, Quarter).seats(500),
|
||||
newUniformClient(2002002002, 7, 30, Quarter, Quarter/2).seats(500),
|
||||
newUniformClient(1001001001, 8, 20, Quarter, Quarter).setInitWidth(500),
|
||||
newUniformClient(2002002002, 7, 30, Quarter, Quarter/2).setInitWidth(500),
|
||||
},
|
||||
concurrencyLimit: 2000,
|
||||
evalDuration: Quarter * 40,
|
||||
|
@ -760,7 +786,7 @@ func TestDifferentWidths(t *testing.T) {
|
|||
qs: qs,
|
||||
clients: []uniformClient{
|
||||
newUniformClient(10010010010010, 13, 10, time.Second, time.Second-1),
|
||||
newUniformClient(20020020020020, 7, 10, time.Second, time.Second-1).seats(2),
|
||||
newUniformClient(20020020020020, 7, 10, time.Second, time.Second-1).setInitWidth(2),
|
||||
},
|
||||
concurrencyLimit: 6,
|
||||
evalDuration: time.Second * 20,
|
||||
|
@ -795,11 +821,11 @@ func TestTooWide(t *testing.T) {
|
|||
uniformScenario{name: qCfg.Name,
|
||||
qs: qs,
|
||||
clients: []uniformClient{
|
||||
newUniformClient(40040040040040, 15, 21, time.Second, time.Second-1).seats(2),
|
||||
newUniformClient(50050050050050, 15, 21, time.Second, time.Second-1).seats(2),
|
||||
newUniformClient(60060060060060, 15, 21, time.Second, time.Second-1).seats(2),
|
||||
newUniformClient(70070070070070, 15, 21, time.Second, time.Second-1).seats(2),
|
||||
newUniformClient(90090090090090, 15, 21, time.Second, time.Second-1).seats(7),
|
||||
newUniformClient(40040040040040, 15, 21, time.Second, time.Second-1).setInitWidth(2),
|
||||
newUniformClient(50050050050050, 15, 21, time.Second, time.Second-1).setInitWidth(2),
|
||||
newUniformClient(60060060060060, 15, 21, time.Second, time.Second-1).setInitWidth(2),
|
||||
newUniformClient(70070070070070, 15, 21, time.Second, time.Second-1).setInitWidth(2),
|
||||
newUniformClient(90090090090090, 15, 21, time.Second, time.Second-1).setInitWidth(7),
|
||||
},
|
||||
concurrencyLimit: 6,
|
||||
evalDuration: time.Second * 225,
|
||||
|
@ -1141,7 +1167,7 @@ func TestFindDispatchQueueLocked(t *testing.T) {
|
|||
robinIndexExpected []int
|
||||
}{
|
||||
{
|
||||
name: "width=1, seats are available, the queue with less virtual start time wins",
|
||||
name: "width1=1, seats are available, the queue with less virtual start time wins",
|
||||
concurrencyLimit: 1,
|
||||
totSeatsInUse: 0,
|
||||
robinIndex: -1,
|
||||
|
@ -1164,7 +1190,7 @@ func TestFindDispatchQueueLocked(t *testing.T) {
|
|||
robinIndexExpected: []int{1},
|
||||
},
|
||||
{
|
||||
name: "width=1, all seats are occupied, no queue is picked",
|
||||
name: "width1=1, all seats are occupied, no queue is picked",
|
||||
concurrencyLimit: 1,
|
||||
totSeatsInUse: 1,
|
||||
robinIndex: -1,
|
||||
|
@ -1181,7 +1207,7 @@ func TestFindDispatchQueueLocked(t *testing.T) {
|
|||
robinIndexExpected: []int{0},
|
||||
},
|
||||
{
|
||||
name: "width > 1, seats are available for request with the least finish R, queue is picked",
|
||||
name: "width1 > 1, seats are available for request with the least finish R, queue is picked",
|
||||
concurrencyLimit: 50,
|
||||
totSeatsInUse: 25,
|
||||
robinIndex: -1,
|
||||
|
@ -1204,7 +1230,7 @@ func TestFindDispatchQueueLocked(t *testing.T) {
|
|||
robinIndexExpected: []int{1},
|
||||
},
|
||||
{
|
||||
name: "width > 1, seats are not available for request with the least finish R, queue is not picked",
|
||||
name: "width1 > 1, seats are not available for request with the least finish R, queue is not picked",
|
||||
concurrencyLimit: 50,
|
||||
totSeatsInUse: 26,
|
||||
robinIndex: -1,
|
||||
|
@ -1227,7 +1253,7 @@ func TestFindDispatchQueueLocked(t *testing.T) {
|
|||
robinIndexExpected: []int{1, 1, 1},
|
||||
},
|
||||
{
|
||||
name: "width > 1, seats become available before 3rd attempt, queue is picked",
|
||||
name: "width1 > 1, seats become available before 3rd attempt, queue is picked",
|
||||
concurrencyLimit: 50,
|
||||
totSeatsInUse: 26,
|
||||
robinIndex: -1,
|
||||
|
@ -1304,7 +1330,8 @@ func TestFinishRequestLocked(t *testing.T) {
|
|||
{
|
||||
name: "request has additional latency",
|
||||
workEstimate: fcrequest.WorkEstimate{
|
||||
InitialSeats: 10,
|
||||
InitialSeats: 1,
|
||||
FinalSeats: 10,
|
||||
AdditionalLatency: time.Minute,
|
||||
},
|
||||
},
|
||||
|
@ -1344,9 +1371,9 @@ func TestFinishRequestLocked(t *testing.T) {
|
|||
|
||||
var (
|
||||
queuesetTotalRequestsExecutingExpected = qs.totRequestsExecuting - 1
|
||||
queuesetTotalSeatsInUseExpected = qs.totSeatsInUse - int(test.workEstimate.InitialSeats)
|
||||
queuesetTotalSeatsInUseExpected = qs.totSeatsInUse - test.workEstimate.MaxSeats()
|
||||
queueRequestsExecutingExpected = queue.requestsExecuting - 1
|
||||
queueSeatsInUseExpected = queue.seatsInUse - int(test.workEstimate.InitialSeats)
|
||||
queueSeatsInUseExpected = queue.seatsInUse - test.workEstimate.MaxSeats()
|
||||
)
|
||||
|
||||
qs.finishRequestLocked(r)
|
||||
|
|
Loading…
Reference in New Issue