Refactor goroutine counting

Add comment outlining TestContextCancel.

Stop calling `t.Errorf` from wrong goroutine.

Package up queueNoteFn expectation checking.

Add counting of goroutine in req1 exec fn.

Remove unnecessary assignment to `_`.

Make TestContextCancel wait on fake clock, to insulate timing check
from scheduler noise.

Factor goroutine counting out of queueset.go, into queueset_test.go,
where it matters.

Refactor promise: Use a simple channel-based implementation for normal
code, a mutex-based one for testing code.

Took all the panics out of queueset.go

Shrink the timeouts in promise tests to 1 second.

Kubernetes-commit: 1db36ae3b30e30d70972998a22987a7db470479b
This commit is contained in:
Mike Spreitzer 2021-07-29 00:35:25 -04:00 committed by Kubernetes Publisher
parent f7215cdf93
commit 8c2108bc80
10 changed files with 561 additions and 324 deletions

View File

@ -22,7 +22,6 @@ import (
"time"
"k8s.io/apiserver/pkg/server/mux"
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/eventclock"
fqs "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset"
@ -83,7 +82,6 @@ func New(
serverConcurrencyLimit int,
requestWaitLimit time.Duration,
) Interface {
grc := counter.NoOp{}
clk := eventclock.Real{}
return NewTestable(TestableConfig{
Name: "Controller",
@ -95,7 +93,7 @@ func New(
ServerConcurrencyLimit: serverConcurrencyLimit,
RequestWaitLimit: requestWaitLimit,
ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator,
QueueSetFactory: fqs.NewQueueSetFactory(clk, grc),
QueueSetFactory: fqs.NewQueueSetFactory(clk),
})
}

View File

@ -1,25 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package counter
// NoOp is a GoRoutineCounter that does not actually count
type NoOp struct{}
var _ GoRoutineCounter = NoOp{}
// Add would adjust the count, if a count were being kept
func (NoOp) Add(int) {}

View File

@ -19,17 +19,12 @@ package promise
// WriteOnce represents a variable that is initially not set and can
// be set once and is readable. This is the common meaning for
// "promise".
// The implementations of this interface are NOT thread-safe.
type WriteOnce interface {
// Get reads the current value of this variable. If this variable
// is not set yet then this call blocks until this variable gets a
// value.
// Get reads the current value of this variable. If this
// variable is not set yet then this call blocks until this
// variable gets a value.
Get() interface{}
// IsSet returns immediately with an indication of whether this
// variable has been set.
IsSet() bool
// Set normally writes a value into this variable, unblocks every
// goroutine waiting for this variable to have a value, and
// returns true. In the unhappy case that this variable is

View File

@ -18,57 +18,53 @@ package promise
import (
"sync"
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
)
// promise implements the promise.WriteOnce interface.
// This implementation is based on a condition variable.
// This implementation tracks active goroutines:
// the given counter is decremented for a goroutine waiting for this
// varible to be set and incremented when such a goroutine is
// unblocked.
// promise implements the WriteOnce interface.
type promise struct {
cond sync.Cond
activeCounter counter.GoRoutineCounter // counter of active goroutines
waitingCount int // number of goroutines idle due to this being unset
isSet bool
value interface{}
doneCh <-chan struct{}
doneVal interface{}
setCh chan struct{}
onceler sync.Once
value interface{}
}
var _ WriteOnce = &promise{}
// NewWriteOnce makes a new promise.LockingWriteOnce
func NewWriteOnce(lock sync.Locker, activeCounter counter.GoRoutineCounter) WriteOnce {
return &promise{
cond: *sync.NewCond(lock),
activeCounter: activeCounter,
// NewWriteOnce makes a new thread-safe WriteOnce.
//
// If `initial` is non-nil then that value is Set at creation time.
//
// If a `Get` is waiting soon after `doneCh` becomes selectable (which
// never happens for the nil channel) then `Set(doneVal)` effectively
// happens at that time.
func NewWriteOnce(initial interface{}, doneCh <-chan struct{}, doneVal interface{}) WriteOnce {
p := &promise{
doneCh: doneCh,
doneVal: doneVal,
setCh: make(chan struct{}),
}
if initial != nil {
p.Set(initial)
}
return p
}
func (p *promise) Get() interface{} {
if !p.isSet {
p.waitingCount++
p.activeCounter.Add(-1)
p.cond.Wait()
select {
case <-p.setCh:
case <-p.doneCh:
p.Set(p.doneVal)
}
return p.value
}
func (p *promise) IsSet() bool {
return p.isSet
}
func (p *promise) Set(value interface{}) bool {
if p.isSet {
return false
}
p.isSet = true
p.value = value
if p.waitingCount > 0 {
p.activeCounter.Add(p.waitingCount)
p.waitingCount = 0
p.cond.Broadcast()
}
return true
var ans bool
p.onceler.Do(func() {
p.value = value
close(p.setCh)
ans = true
})
return ans
}

View File

@ -17,105 +17,103 @@ limitations under the License.
package promise
import (
"sync"
"sync/atomic"
"context"
"testing"
"time"
testclock "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/eventclock"
"k8s.io/apimachinery/pkg/util/wait"
)
func TestLockingWriteOnce(t *testing.T) {
func TestWriteOnceSet(t *testing.T) {
oldTime := time.Now()
cval := &oldTime
ctx, cancel := context.WithCancel(context.Background())
wr := NewWriteOnce(nil, ctx.Done(), cval)
gots := make(chan interface{})
goGetExpectNotYet(t, wr, gots, "Set")
now := time.Now()
clock, counter := testclock.NewFake(now, 0, nil)
var lock sync.Mutex
wr := NewWriteOnce(&lock, counter)
var gots int32
var got atomic.Value
counter.Add(1)
go func() {
lock.Lock()
defer lock.Unlock()
got.Store(wr.Get())
atomic.AddInt32(&gots, 1)
counter.Add(-1)
}()
clock.Run(nil)
time.Sleep(time.Second)
if atomic.LoadInt32(&gots) != 0 {
t.Error("Get returned before Set")
}
func() {
lock.Lock()
defer lock.Unlock()
if wr.IsSet() {
t.Error("IsSet before Set")
}
}()
aval := &now
func() {
lock.Lock()
defer lock.Unlock()
if !wr.Set(aval) {
t.Error("Set() returned false")
}
}()
clock.Run(nil)
time.Sleep(time.Second)
if atomic.LoadInt32(&gots) != 1 {
t.Error("Get did not return after Set")
if !wr.Set(aval) {
t.Error("Set() returned false")
}
if got.Load() != aval {
t.Error("Get did not return what was Set")
}
func() {
lock.Lock()
defer lock.Unlock()
if !wr.IsSet() {
t.Error("IsSet()==false after Set")
}
}()
counter.Add(1)
go func() {
lock.Lock()
defer lock.Unlock()
got.Store(wr.Get())
atomic.AddInt32(&gots, 1)
counter.Add(-1)
}()
clock.Run(nil)
time.Sleep(time.Second)
if atomic.LoadInt32(&gots) != 2 {
t.Error("Second Get did not return immediately")
}
if got.Load() != aval {
t.Error("Second Get did not return what was Set")
}
func() {
lock.Lock()
defer lock.Unlock()
if !wr.IsSet() {
t.Error("IsSet()==false after second Get")
}
}()
expectGotValue(t, gots, aval)
goGetAndExpect(t, wr, gots, aval)
later := time.Now()
bval := &later
func() {
lock.Lock()
defer lock.Unlock()
if wr.Set(bval) {
t.Error("second Set() returned true")
}
}()
func() {
lock.Lock()
defer lock.Unlock()
if !wr.IsSet() {
t.Error("IsSet() returned false after second set")
} else if wr.Get() != aval {
t.Error("Get() after second Set returned wrong value")
}
}()
if wr.Set(bval) {
t.Error("second Set() returned true")
}
goGetAndExpect(t, wr, gots, aval)
cancel()
time.Sleep(time.Second) // give it a chance to misbehave
goGetAndExpect(t, wr, gots, aval)
}
func TestWriteOnceCancel(t *testing.T) {
oldTime := time.Now()
cval := &oldTime
ctx, cancel := context.WithCancel(context.Background())
wr := NewWriteOnce(nil, ctx.Done(), cval)
gots := make(chan interface{})
goGetExpectNotYet(t, wr, gots, "cancel")
cancel()
expectGotValue(t, gots, cval)
goGetAndExpect(t, wr, gots, cval)
later := time.Now()
bval := &later
if wr.Set(bval) {
t.Error("Set() after cancel returned true")
}
goGetAndExpect(t, wr, gots, cval)
}
func TestWriteOnceInitial(t *testing.T) {
oldTime := time.Now()
cval := &oldTime
ctx, cancel := context.WithCancel(context.Background())
now := time.Now()
aval := &now
wr := NewWriteOnce(aval, ctx.Done(), cval)
gots := make(chan interface{})
goGetAndExpect(t, wr, gots, aval)
later := time.Now()
bval := &later
if wr.Set(bval) {
t.Error("Set of initialized promise returned true")
}
goGetAndExpect(t, wr, gots, aval)
cancel()
time.Sleep(time.Second) // give it a chance to misbehave
goGetAndExpect(t, wr, gots, aval)
}
func goGetExpectNotYet(t *testing.T, wr WriteOnce, gots chan interface{}, trigger string) {
go func() {
gots <- wr.Get()
}()
select {
case <-gots:
t.Errorf("Get returned before %s", trigger)
case <-time.After(time.Second):
t.Log("Good: Get did not return yet")
}
}
func goGetAndExpect(t *testing.T, wr WriteOnce, gots chan interface{}, expected interface{}) {
go func() {
gots <- wr.Get()
}()
expectGotValue(t, gots, expected)
}
func expectGotValue(t *testing.T, gots <-chan interface{}, expected interface{}) {
select {
case gotVal := <-gots:
t.Logf("Got %v", gotVal)
if gotVal != expected {
t.Errorf("Get returned %v, expected: %v", gotVal, expected)
}
case <-time.After(wait.ForeverTestTimeout):
t.Error("Get did not return")
}
}

View File

@ -23,8 +23,6 @@ import (
"sync"
"time"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/eventclock"
@ -45,10 +43,18 @@ const nsTimeFmt = "2006-01-02 15:04:05.000000000"
// queueSetFactory implements the QueueSetFactory interface
// queueSetFactory makes QueueSet objects.
type queueSetFactory struct {
counter counter.GoRoutineCounter
clock eventclock.Interface
clock eventclock.Interface
promiseFactoryFactory promiseFactoryFactory
}
// promiseFactory returns a WriteOnce
// - whose Set method is invoked with the queueSet locked, and
// - whose Get method is invoked with the queueSet not locked.
type promiseFactory func(initial interface{}, doneCh <-chan struct{}, doneVal interface{}) promise.WriteOnce
// promiseFactoryFactory returns the promiseFactory to use for the given queueSet
type promiseFactoryFactory func(*queueSet) promiseFactory
// `*queueSetCompleter` implements QueueSetCompleter. Exactly one of
// the fields `factory` and `theSet` is non-nil.
type queueSetCompleter struct {
@ -61,8 +67,8 @@ type queueSetCompleter struct {
// queueSet implements the Fair Queuing for Server Requests technique
// described in this package's doc, and a pointer to one implements
// the QueueSet interface. The clock, GoRoutineCounter, and estimated
// service time should not be changed; the fields listed after the
// the QueueSet interface. The fields listed before the lock
// should not be changed; the fields listed after the
// lock must be accessed only while holding the lock. The methods of
// this type follow the naming convention that the suffix "Locked"
// means the caller must hold the lock; for a method whose name does
@ -70,10 +76,11 @@ type queueSetCompleter struct {
// locking.
type queueSet struct {
clock eventclock.Interface
counter counter.GoRoutineCounter
estimatedServiceTime float64
obsPair metrics.TimedObserverPair
promiseFactory promiseFactory
lock sync.Mutex
// qCfg holds the current queuing configuration. Its
@ -119,10 +126,15 @@ type queueSet struct {
}
// NewQueueSetFactory creates a new QueueSetFactory object
func NewQueueSetFactory(c eventclock.Interface, counter counter.GoRoutineCounter) fq.QueueSetFactory {
func NewQueueSetFactory(c eventclock.Interface) fq.QueueSetFactory {
return newTestableQueueSetFactory(c, ordinaryPromiseFactoryFactory)
}
// newTestableQueueSetFactory creates a new QueueSetFactory object with the given promiseFactoryFactory
func newTestableQueueSetFactory(c eventclock.Interface, promiseFactoryFactory promiseFactoryFactory) fq.QueueSetFactory {
return &queueSetFactory{
counter: counter,
clock: c,
clock: c,
promiseFactoryFactory: promiseFactoryFactory,
}
}
@ -157,13 +169,13 @@ func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet {
if qs == nil {
qs = &queueSet{
clock: qsc.factory.clock,
counter: qsc.factory.counter,
estimatedServiceTime: 60,
obsPair: qsc.obsPair,
qCfg: qsc.qCfg,
virtualTime: 0,
lastRealTime: qsc.factory.clock.Now(),
}
qs.promiseFactory = qsc.factory.promiseFactoryFactory(qs)
}
qs.setConfiguration(qsc.qCfg, qsc.dealer, dCfg)
return qs
@ -240,6 +252,8 @@ const (
// executing at each point where there is a change in that quantity,
// because the metrics --- and only the metrics --- track that
// quantity per FlowSchema.
// The queueSet's promiseFactory is invoked once if the returns Request is non-nil,
// not invoked if the Request is nil.
func (qs *queueSet) StartRequest(ctx context.Context, workEstimate *fqrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) {
qs.lockAndSyncTime()
defer qs.lock.Unlock()
@ -286,39 +300,16 @@ func (qs *queueSet) StartRequest(ctx context.Context, workEstimate *fqrequest.Wo
// request from that queue.
qs.dispatchAsMuchAsPossibleLocked()
// ========================================================================
// Step 3:
// Set up a relay from the context's Done channel to the world
// of well-counted goroutines. We Are Told that every
// request's context's Done channel gets closed by the time
// the request is done being processed.
doneCh := ctx.Done()
// Retrieve the queueset configuration name while we have the lock
// and use it in the goroutine below.
configName := qs.qCfg.Name
if doneCh != nil {
qs.preCreateOrUnblockGoroutine()
go func() {
defer runtime.HandleCrash()
qs.goroutineDoneOrBlocked()
<-doneCh
// Whatever goroutine unblocked the preceding receive MUST
// have already either (a) incremented qs.counter or (b)
// known that said counter is not actually counting or (c)
// known that the count does not need to be accurate.
// BTW, the count only needs to be accurate in a test that
// uses FakeEventClock::Run().
klog.V(6).Infof("QS(%s): Context of request %q %#+v %#+v is Done", configName, fsName, descr1, descr2)
qs.cancelWait(req)
qs.goroutineDoneOrBlocked()
}()
}
return req, false
}
// ordinaryPromiseFactoryFactory is the promiseFactoryFactory that
// a queueSetFactory would ordinarily use.
// Test code might use something different.
func ordinaryPromiseFactoryFactory(qs *queueSet) promiseFactory {
return promise.NewWriteOnce
}
// Seats returns the number of seats this request requires.
func (req *request) Seats() int {
return int(req.workEstimate.Seats)
@ -348,41 +339,43 @@ func (req *request) Finish(execFn func()) bool {
func (req *request) wait() (bool, bool) {
qs := req.qs
qs.lock.Lock()
// ========================================================================
// Step 3:
// The final step is to wait on a decision from
// somewhere and then act on it.
decisionAny := req.decision.Get()
qs.lockAndSyncTime()
defer qs.lock.Unlock()
if req.waitStarted {
// This can not happen, because the client is forbidden to
// call Wait twice on the same request
panic(fmt.Sprintf("Multiple calls to the Wait method, QueueSet=%s, startTime=%s, descr1=%#+v, descr2=%#+v", req.qs.qCfg.Name, req.startTime, req.descr1, req.descr2))
klog.Errorf("Duplicate call to the Wait method! Immediately returning execute=false. QueueSet=%s, startTime=%s, descr1=%#+v, descr2=%#+v", req.qs.qCfg.Name, req.startTime, req.descr1, req.descr2)
return false, qs.isIdleLocked()
}
req.waitStarted = true
// ========================================================================
// Step 4:
// The final step is to wait on a decision from
// somewhere and then act on it.
decisionAny := req.decision.Get()
qs.syncTimeLocked()
decision, isDecision := decisionAny.(requestDecision)
if !isDecision {
panic(fmt.Sprintf("QS(%s): Impossible decision %#+v (of type %T) for request %#+v %#+v", qs.qCfg.Name, decisionAny, decisionAny, req.descr1, req.descr2))
}
switch decision {
switch decisionAny {
case decisionReject:
klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.qCfg.Name, req.descr1, req.descr2)
metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "time-out")
return false, qs.isIdleLocked()
case decisionCancel:
// TODO(aaron-prindle) add metrics for this case
klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.qCfg.Name, req.descr1, req.descr2)
return false, qs.isIdleLocked()
case decisionExecute:
klog.V(5).Infof("QS(%s): Dispatching request %#+v %#+v from its queue", qs.qCfg.Name, req.descr1, req.descr2)
return true, false
default:
// This can not happen, all possible values are handled above
panic(decision)
klog.Errorf("QS(%s): Impossible decision (type %T, value %#+v) for request %#+v %#+v! Treating as cancel", qs.qCfg.Name, decisionAny, decisionAny, req.descr1, req.descr2)
}
// TODO(aaron-prindle) add metrics for this case
klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.qCfg.Name, req.descr1, req.descr2)
// remove the request from the queue as it has timed out
req.removeFromQueueFn()
qs.totRequestsWaiting--
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
req.NoteQueued(false)
qs.obsPair.RequestsWaiting.Add(-1)
return false, qs.isIdleLocked()
}
func (qs *queueSet) IsIdle() bool {
@ -458,7 +451,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
fsName: fsName,
flowDistinguisher: flowDistinguisher,
ctx: ctx,
decision: promise.NewWriteOnce(&qs.lock, qs.counter),
decision: qs.promiseFactory(nil, ctx.Done(), decisionCancel),
arrivalTime: qs.clock.Now(),
queue: queue,
descr1: descr1,
@ -593,13 +586,12 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f
flowDistinguisher: flowDistinguisher,
ctx: ctx,
startTime: now,
decision: promise.NewWriteOnce(&qs.lock, qs.counter),
decision: qs.promiseFactory(decisionExecute, ctx.Done(), decisionCancel),
arrivalTime: now,
descr1: descr1,
descr2: descr2,
workEstimate: *workEstimate,
}
req.decision.Set(decisionExecute)
qs.totRequestsExecuting++
qs.totSeatsInUse += req.Seats()
metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1)
@ -652,26 +644,6 @@ func (qs *queueSet) dispatchLocked() bool {
return ok
}
// cancelWait ensures the request is not waiting. This is only
// applicable to a request that has been assigned to a queue.
func (qs *queueSet) cancelWait(req *request) {
qs.lock.Lock()
defer qs.lock.Unlock()
if req.decision.IsSet() {
// The request has already been removed from the queue
// and so we consider its wait to be over.
return
}
req.decision.Set(decisionCancel)
// remove the request from the queue as it has timed out
req.removeFromQueueFn()
qs.totRequestsWaiting--
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
req.NoteQueued(false)
qs.obsPair.RequestsWaiting.Add(-1)
}
// canAccommodateSeatsLocked returns true if this queueSet has enough
// seats available to accommodate a request with the given number of seats,
// otherwise it returns false.
@ -856,21 +828,6 @@ func removeQueueAndUpdateIndexes(queues []*queue, index int) []*queue {
return keptQueues
}
// preCreateOrUnblockGoroutine needs to be called before creating a
// goroutine associated with this queueSet or unblocking a blocked
// one, to properly update the accounting used in testing.
func (qs *queueSet) preCreateOrUnblockGoroutine() {
qs.counter.Add(1)
}
// goroutineDoneOrBlocked needs to be called at the end of every
// goroutine associated with this queueSet or when such a goroutine is
// about to wait on some other goroutine to do something; this is to
// properly update the accounting used in testing.
func (qs *queueSet) goroutineDoneOrBlocked() {
qs.counter.Add(-1)
}
func (qs *queueSet) UpdateObservations() {
qs.obsPair.RequestsWaiting.Add(0)
qs.obsPair.RequestsExecuting.Add(0)

View File

@ -21,8 +21,11 @@ import (
"errors"
"fmt"
"math"
"os"
"reflect"
"sort"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
@ -31,8 +34,10 @@ import (
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise"
test "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing"
testeventclock "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/eventclock"
testpromise "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/promise"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
"k8s.io/klog/v2"
@ -307,11 +312,11 @@ func (uss *uniformScenarioState) finalReview() {
expectedRejects := ""
for i := range uss.clients {
fsName := fmt.Sprintf("client%d", i)
if atomic.AddInt32(&uss.executions[i], 0) > 0 {
if atomic.LoadInt32(&uss.executions[i]) > 0 {
uss.expectedExecuting = uss.expectedExecuting + fmt.Sprintf(` apiserver_flowcontrol_current_executing_requests{flow_schema=%q,priority_level=%q} 0%s`, fsName, uss.name, "\n")
uss.expectedConcurrencyInUse = uss.expectedConcurrencyInUse + fmt.Sprintf(` apiserver_flowcontrol_request_concurrency_in_use{flow_schema=%q,priority_level=%q} 0%s`, fsName, uss.name, "\n")
}
if atomic.AddInt32(&uss.rejects[i], 0) > 0 {
if atomic.LoadInt32(&uss.rejects[i]) > 0 {
expectedRejects = expectedRejects + fmt.Sprintf(` apiserver_flowcontrol_rejected_requests_total{flow_schema=%q,priority_level=%q,reason=%q} %d%s`, fsName, uss.name, uss.rejectReason, uss.rejects[i], "\n")
}
}
@ -353,8 +358,9 @@ func (uss *uniformScenarioState) finalReview() {
}
}
func init() {
func TestMain(m *testing.M) {
klog.InitFlags(nil)
os.Exit(m.Run())
}
// TestNoRestraint tests whether the no-restraint factory gives every client what it asks for
@ -388,7 +394,7 @@ func TestUniformFlowsHandSize1(t *testing.T) {
now := time.Now()
clk, counter := testeventclock.NewFake(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter))
qCfg := fq.QueuingConfig{
Name: "TestUniformFlowsHandSize1",
DesiredNumQueues: 9,
@ -425,7 +431,7 @@ func TestUniformFlowsHandSize3(t *testing.T) {
now := time.Now()
clk, counter := testeventclock.NewFake(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter))
qCfg := fq.QueuingConfig{
Name: "TestUniformFlowsHandSize3",
DesiredNumQueues: 8,
@ -461,7 +467,7 @@ func TestDifferentFlowsExpectEqual(t *testing.T) {
now := time.Now()
clk, counter := testeventclock.NewFake(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter))
qCfg := fq.QueuingConfig{
Name: "DiffFlowsExpectEqual",
DesiredNumQueues: 9,
@ -498,7 +504,7 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) {
now := time.Now()
clk, counter := testeventclock.NewFake(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter))
qCfg := fq.QueuingConfig{
Name: "DiffFlowsExpectUnequal",
DesiredNumQueues: 9,
@ -535,7 +541,7 @@ func TestWindup(t *testing.T) {
now := time.Now()
clk, counter := testeventclock.NewFake(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter))
qCfg := fq.QueuingConfig{
Name: "TestWindup",
DesiredNumQueues: 9,
@ -571,7 +577,7 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) {
now := time.Now()
clk, counter := testeventclock.NewFake(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter))
qCfg := fq.QueuingConfig{
Name: "TestDifferentFlowsWithoutQueuing",
DesiredNumQueues: 0,
@ -604,7 +610,7 @@ func TestTimeout(t *testing.T) {
now := time.Now()
clk, counter := testeventclock.NewFake(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter))
qCfg := fq.QueuingConfig{
Name: "TestTimeout",
DesiredNumQueues: 128,
@ -635,12 +641,27 @@ func TestTimeout(t *testing.T) {
}.exercise(t)
}
// TestContextCancel tests cancellation of a request's context.
// The outline is:
// 1. Use a concurrency limit of 1.
// 2. Start request 1.
// 3. Use a fake clock for the following logic, to insulate from scheduler noise.
// 4. The exec fn of request 1 starts request 2, which should wait
// in its queue.
// 5. The exec fn of request 1 also forks a goroutine that waits 1 second
// and then cancels the context of request 2.
// 6. The exec fn of request 1, if StartRequest 2 returns a req2 (which is the normal case),
// calls `req2.Finish`, which is expected to return after the context cancel.
// 7. The queueset interface allows StartRequest 2 to return `nil` in this situation,
// if the scheduler gets the cancel done before StartRequest finishes;
// the test handles this without regard to whether the implementation will ever do that.
// 8. Check that the above took exactly 1 second.
func TestContextCancel(t *testing.T) {
metrics.Register()
metrics.Reset()
now := time.Now()
clk, counter := testeventclock.NewFake(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter))
qCfg := fq.QueuingConfig{
Name: "TestContextCancel",
DesiredNumQueues: 11,
@ -653,59 +674,80 @@ func TestContextCancel(t *testing.T) {
t.Fatal(err)
}
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1})
counter.Add(1) // account for the goroutine running this test
counter.Add(1) // account for main activity of the goroutine running this test
ctx1 := context.Background()
b2i := map[bool]int{false: 0, true: 1}
var qnc [2][2]int32
req1, _ := qs.StartRequest(ctx1, &fcrequest.WorkEstimate{Seats: 1}, 1, "", "fs1", "test", "one", func(inQueue bool) { atomic.AddInt32(&qnc[0][b2i[inQueue]], 1) })
pZero := func() *int32 { var zero int32; return &zero }
// counts of calls to the QueueNoteFns
queueNoteCounts := map[int]map[bool]*int32{
1: {false: pZero(), true: pZero()},
2: {false: pZero(), true: pZero()},
}
queueNoteFn := func(fn int) func(inQueue bool) {
return func(inQueue bool) { atomic.AddInt32(queueNoteCounts[fn][inQueue], 1) }
}
fatalErrs := []string{}
var errsLock sync.Mutex
expectQNCount := func(fn int, inQueue bool, expect int32) {
if a := atomic.LoadInt32(queueNoteCounts[fn][inQueue]); a != expect {
errsLock.Lock()
defer errsLock.Unlock()
fatalErrs = append(fatalErrs, fmt.Sprintf("Got %d calls to queueNoteFn%d(%v), expected %d", a, fn, inQueue, expect))
}
}
expectQNCounts := func(fn int, expectF, expectT int32) {
expectQNCount(fn, false, expectF)
expectQNCount(fn, true, expectT)
}
req1, _ := qs.StartRequest(ctx1, &fcrequest.WorkEstimate{Seats: 1}, 1, "", "fs1", "test", "one", queueNoteFn(1))
if req1 == nil {
t.Error("Request rejected")
return
}
if a := atomic.AddInt32(&qnc[0][0], 0); a != 1 {
t.Errorf("Got %d calls to queueNoteFn1(false), expected 1", a)
expectQNCounts(1, 1, 1)
var executed1, idle1 bool
counter.Add(1) // account for the following goroutine
go func() {
defer counter.Add(-1) // account completion of this goroutine
idle1 = req1.Finish(func() {
executed1 = true
ctx2, cancel2 := context.WithCancel(context.Background())
tBefore := clk.Now()
counter.Add(1) // account for the following goroutine
go func() {
defer counter.Add(-1) // account completion of this goroutine
clk.Sleep(time.Second)
expectQNCounts(2, 0, 1)
// account for unblocking the goroutine that waits on cancelation
counter.Add(1)
cancel2()
}()
req2, idle2a := qs.StartRequest(ctx2, &fcrequest.WorkEstimate{Seats: 1}, 2, "", "fs2", "test", "two", queueNoteFn(2))
if idle2a {
t.Error("2nd StartRequest returned idle")
}
if req2 != nil {
idle2b := req2.Finish(func() {
t.Error("Executing req2")
})
if idle2b {
t.Error("2nd Finish returned idle")
}
expectQNCounts(2, 1, 1)
}
tAfter := clk.Now()
dt := tAfter.Sub(tBefore)
if dt != time.Second {
t.Errorf("Unexpected: dt=%d", dt)
}
})
}()
counter.Add(-1) // completion of main activity of goroutine running this test
clk.Run(nil)
errsLock.Lock()
defer errsLock.Unlock()
if len(fatalErrs) > 0 {
t.Error(strings.Join(fatalErrs, "; "))
}
if a := atomic.AddInt32(&qnc[0][1], 0); a != 1 {
t.Errorf("Got %d calls to queueNoteFn1(true), expected 1", a)
}
var executed1 bool
idle1 := req1.Finish(func() {
executed1 = true
ctx2, cancel2 := context.WithCancel(context.Background())
tBefore := time.Now()
go func() {
time.Sleep(time.Second)
if a := atomic.AddInt32(&qnc[1][0], 0); a != 0 {
t.Errorf("Got %d calls to queueNoteFn2(false), expected 0", a)
}
if a := atomic.AddInt32(&qnc[1][1], 0); a != 1 {
t.Errorf("Got %d calls to queueNoteFn2(true), expected 1", a)
}
// account for unblocking the goroutine that waits on cancelation
counter.Add(1)
cancel2()
}()
req2, idle2a := qs.StartRequest(ctx2, &fcrequest.WorkEstimate{Seats: 1}, 2, "", "fs2", "test", "two", func(inQueue bool) { atomic.AddInt32(&qnc[1][b2i[inQueue]], 1) })
if idle2a {
t.Error("2nd StartRequest returned idle")
}
if req2 != nil {
idle2b := req2.Finish(func() {
t.Error("Executing req2")
})
if idle2b {
t.Error("2nd Finish returned idle")
}
if a := atomic.AddInt32(&qnc[1][0], 0); a != 1 {
t.Errorf("Got %d calls to queueNoteFn2(false), expected 1", a)
}
}
tAfter := time.Now()
dt := tAfter.Sub(tBefore)
if dt < time.Second || dt > 2*time.Second {
t.Errorf("Unexpected: dt=%d", dt)
}
})
if !executed1 {
t.Errorf("Unexpected: executed1=%v", executed1)
}
@ -714,12 +756,20 @@ func TestContextCancel(t *testing.T) {
}
}
func countingPromiseFactoryFactory(activeCounter counter.GoRoutineCounter) promiseFactoryFactory {
return func(qs *queueSet) promiseFactory {
return func(initial interface{}, doneCh <-chan struct{}, doneVal interface{}) promise.WriteOnce {
return testpromise.NewCountingWriteOnce(activeCounter, &qs.lock, initial, doneCh, doneVal)
}
}
}
func TestTotalRequestsExecutingWithPanic(t *testing.T) {
metrics.Register()
metrics.Reset()
now := time.Now()
clk, counter := testeventclock.NewFake(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter))
qCfg := fq.QueuingConfig{
Name: "TestTotalRequestsExecutingWithPanic",
DesiredNumQueues: 0,

View File

@ -50,11 +50,10 @@ type request struct {
// decision gets set to a `requestDecision` indicating what to do
// with this request. It gets set exactly once, when the request
// is removed from its queue. The value will be decisionReject,
// decisionCancel, or decisionExecute; decisionTryAnother never
// appears here.
// decisionCancel, or decisionExecute.
//
// The field is NOT thread-safe and should be protected by the
// queueset's lock.
// decision.Set is called with the queueSet locked.
// decision.Get is called without the queueSet locked.
decision promise.WriteOnce
// arrivalTime is the real time when the request entered this system

View File

@ -0,0 +1,105 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package promise
import (
"sync"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
promiseifc "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise"
)
// countingPromise implements the WriteOnce interface.
// This implementation is based on a condition variable.
// This implementation tracks active goroutines:
// the given counter is decremented for a goroutine waiting for this
// varible to be set and incremented when such a goroutine is
// unblocked.
type countingPromise struct {
lock sync.Locker
cond sync.Cond
activeCounter counter.GoRoutineCounter // counter of active goroutines
waitingCount int // number of goroutines idle due to this being unset
isSet bool
value interface{}
}
var _ promiseifc.WriteOnce = &countingPromise{}
// NewCountingWriteOnce creates a WriteOnce that uses locking and counts goroutine activity.
//
// The final three arguments are like those for a regular WriteOnce factory:
// - an optional initial value,
// - an optional "done" channel,
// - the value that is Set after the "done" channel becomes selectable.
// Note that for this implementation, the reaction to `doneCh`
// becoming selectable does not wait for a Get.
// If `doneCh != nil` then the caller promises to close it reasonably promptly
// (to the degree allowed by the Go runtime scheduler), and increment the
// goroutine counter before that.
// The WriteOnce's Get method must be called without the lock held.
// The WriteOnce's Set method must be called with the lock held.
func NewCountingWriteOnce(activeCounter counter.GoRoutineCounter, lock sync.Locker, initial interface{}, doneCh <-chan struct{}, doneVal interface{}) promiseifc.WriteOnce {
p := &countingPromise{
lock: lock,
cond: *sync.NewCond(lock),
activeCounter: activeCounter,
isSet: initial != nil,
value: initial,
}
if doneCh != nil {
activeCounter.Add(1) // count start of the following goroutine
go func() {
defer activeCounter.Add(-1) // count completion of this goroutine
defer runtime.HandleCrash()
activeCounter.Add(-1) // count suspension for channel receive
<-doneCh
// Whatever goroutine unblocked the preceding receive MUST
// have already accounted for this activation.
lock.Lock()
defer lock.Unlock()
p.Set(doneVal)
}()
}
return p
}
func (p *countingPromise) Get() interface{} {
p.lock.Lock()
defer p.lock.Unlock()
if !p.isSet {
p.waitingCount++
p.activeCounter.Add(-1)
p.cond.Wait()
}
return p.value
}
func (p *countingPromise) Set(value interface{}) bool {
if p.isSet {
return false
}
p.isSet = true
p.value = value
if p.waitingCount > 0 {
p.activeCounter.Add(p.waitingCount)
p.waitingCount = 0
p.cond.Broadcast()
}
return true
}

View File

@ -0,0 +1,164 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package promise
import (
"context"
"os"
"sync"
"testing"
"time"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise"
testeventclock "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/eventclock"
"k8s.io/klog/v2"
)
func TestMain(m *testing.M) {
klog.InitFlags(nil)
os.Exit(m.Run())
}
func TestCountingWriteOnceSet(t *testing.T) {
oldTime := time.Now()
cval := &oldTime
doneCh := make(chan struct{})
now := time.Now()
clock, counter := testeventclock.NewFake(now, 0, nil)
var lock sync.Mutex
wr := NewCountingWriteOnce(counter, &lock, nil, doneCh, cval)
gots := make(chan interface{}, 1)
goGetExpectNotYet(t, clock, counter, wr, gots, "Set")
aval := &now
func() {
lock.Lock()
defer lock.Unlock()
if !wr.Set(aval) {
t.Error("Set() returned false")
}
}()
clock.Run(nil)
expectGotValue(t, gots, aval)
goGetAndExpect(t, clock, counter, wr, gots, aval)
later := time.Now()
bval := &later
func() {
lock.Lock()
defer lock.Unlock()
if wr.Set(bval) {
t.Error("second Set() returned true")
}
}()
goGetAndExpect(t, clock, counter, wr, gots, aval)
counter.Add(1) // account for unblocking the receive on doneCh
close(doneCh)
time.Sleep(time.Second) // give it a chance to misbehave
goGetAndExpect(t, clock, counter, wr, gots, aval)
}
func TestCountingWriteOnceCancel(t *testing.T) {
oldTime := time.Now()
cval := &oldTime
clock, counter := testeventclock.NewFake(oldTime, 0, nil)
ctx, cancel := context.WithCancel(context.Background())
var lock sync.Mutex
wr := NewCountingWriteOnce(counter, &lock, nil, ctx.Done(), cval)
gots := make(chan interface{}, 1)
goGetExpectNotYet(t, clock, counter, wr, gots, "cancel")
counter.Add(1) // account for unblocking the receive on doneCh
cancel()
clock.Run(nil)
expectGotValue(t, gots, cval)
goGetAndExpect(t, clock, counter, wr, gots, cval)
later := time.Now()
bval := &later
func() {
lock.Lock()
defer lock.Unlock()
if wr.Set(bval) {
t.Error("Set() after cancel returned true")
}
}()
goGetAndExpect(t, clock, counter, wr, gots, cval)
}
func TestCountingWriteOnceInitial(t *testing.T) {
oldTime := time.Now()
cval := &oldTime
clock, counter := testeventclock.NewFake(oldTime, 0, nil)
ctx, cancel := context.WithCancel(context.Background())
var lock sync.Mutex
now := time.Now()
aval := &now
wr := NewCountingWriteOnce(counter, &lock, aval, ctx.Done(), cval)
gots := make(chan interface{}, 1)
goGetAndExpect(t, clock, counter, wr, gots, aval)
goGetAndExpect(t, clock, counter, wr, gots, aval) // check that a set value stays set
later := time.Now()
bval := &later
func() {
lock.Lock()
defer lock.Unlock()
if wr.Set(bval) {
t.Error("Set of initialized promise returned true")
}
}()
goGetAndExpect(t, clock, counter, wr, gots, aval)
counter.Add(1) // account for unblocking receive on doneCh
cancel()
time.Sleep(time.Second) // give it a chance to misbehave
goGetAndExpect(t, clock, counter, wr, gots, aval)
}
func goGetExpectNotYet(t *testing.T, clk *testeventclock.Fake, grc counter.GoRoutineCounter, wr promise.WriteOnce, gots chan interface{}, trigger string) {
grc.Add(1) // count the following goroutine
go func() {
defer grc.Add(-1) // count completion of this goroutine
gots <- wr.Get()
}()
clk.Run(nil)
select {
case <-gots:
t.Errorf("Get returned before %s", trigger)
case <-time.After(time.Second):
t.Log("Good: Get did not return yet")
}
}
func goGetAndExpect(t *testing.T, clk *testeventclock.Fake, grc counter.GoRoutineCounter, wr promise.WriteOnce, gots chan interface{}, expected interface{}) {
grc.Add(1)
go func() {
defer grc.Add(-1)
gots <- wr.Get()
}()
clk.Run(nil)
expectGotValue(t, gots, expected)
}
func expectGotValue(t *testing.T, gots <-chan interface{}, expected interface{}) {
select {
case gotVal := <-gots:
t.Logf("Got %v", gotVal)
if gotVal != expected {
t.Errorf("Get returned %v, expected: %v", gotVal, expected)
}
case <-time.After(wait.ForeverTestTimeout):
t.Error("Get did not return")
}
}