diff --git a/pkg/util/flowcontrol/apf_filter.go b/pkg/util/flowcontrol/apf_filter.go index 11e709ca6..2268f6c94 100644 --- a/pkg/util/flowcontrol/apf_filter.go +++ b/pkg/util/flowcontrol/apf_filter.go @@ -22,7 +22,6 @@ import ( "time" "k8s.io/apiserver/pkg/server/mux" - "k8s.io/apiserver/pkg/util/flowcontrol/counter" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/eventclock" fqs "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset" @@ -83,7 +82,6 @@ func New( serverConcurrencyLimit int, requestWaitLimit time.Duration, ) Interface { - grc := counter.NoOp{} clk := eventclock.Real{} return NewTestable(TestableConfig{ Name: "Controller", @@ -95,7 +93,7 @@ func New( ServerConcurrencyLimit: serverConcurrencyLimit, RequestWaitLimit: requestWaitLimit, ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, - QueueSetFactory: fqs.NewQueueSetFactory(clk, grc), + QueueSetFactory: fqs.NewQueueSetFactory(clk), }) } diff --git a/pkg/util/flowcontrol/counter/noop.go b/pkg/util/flowcontrol/counter/noop.go deleted file mode 100644 index fa946f6f0..000000000 --- a/pkg/util/flowcontrol/counter/noop.go +++ /dev/null @@ -1,25 +0,0 @@ -/* -Copyright 2019 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package counter - -// NoOp is a GoRoutineCounter that does not actually count -type NoOp struct{} - -var _ GoRoutineCounter = NoOp{} - -// Add would adjust the count, if a count were being kept -func (NoOp) Add(int) {} diff --git a/pkg/util/flowcontrol/fairqueuing/promise/interface.go b/pkg/util/flowcontrol/fairqueuing/promise/interface.go index 58af45949..b2e3adbdc 100644 --- a/pkg/util/flowcontrol/fairqueuing/promise/interface.go +++ b/pkg/util/flowcontrol/fairqueuing/promise/interface.go @@ -19,17 +19,12 @@ package promise // WriteOnce represents a variable that is initially not set and can // be set once and is readable. This is the common meaning for // "promise". -// The implementations of this interface are NOT thread-safe. type WriteOnce interface { - // Get reads the current value of this variable. If this variable - // is not set yet then this call blocks until this variable gets a - // value. + // Get reads the current value of this variable. If this + // variable is not set yet then this call blocks until this + // variable gets a value. Get() interface{} - // IsSet returns immediately with an indication of whether this - // variable has been set. - IsSet() bool - // Set normally writes a value into this variable, unblocks every // goroutine waiting for this variable to have a value, and // returns true. In the unhappy case that this variable is diff --git a/pkg/util/flowcontrol/fairqueuing/promise/promise.go b/pkg/util/flowcontrol/fairqueuing/promise/promise.go index 70059e827..d3bda40aa 100644 --- a/pkg/util/flowcontrol/fairqueuing/promise/promise.go +++ b/pkg/util/flowcontrol/fairqueuing/promise/promise.go @@ -18,57 +18,53 @@ package promise import ( "sync" - - "k8s.io/apiserver/pkg/util/flowcontrol/counter" ) -// promise implements the promise.WriteOnce interface. -// This implementation is based on a condition variable. -// This implementation tracks active goroutines: -// the given counter is decremented for a goroutine waiting for this -// varible to be set and incremented when such a goroutine is -// unblocked. +// promise implements the WriteOnce interface. type promise struct { - cond sync.Cond - activeCounter counter.GoRoutineCounter // counter of active goroutines - waitingCount int // number of goroutines idle due to this being unset - isSet bool - value interface{} + doneCh <-chan struct{} + doneVal interface{} + setCh chan struct{} + onceler sync.Once + value interface{} } var _ WriteOnce = &promise{} -// NewWriteOnce makes a new promise.LockingWriteOnce -func NewWriteOnce(lock sync.Locker, activeCounter counter.GoRoutineCounter) WriteOnce { - return &promise{ - cond: *sync.NewCond(lock), - activeCounter: activeCounter, +// NewWriteOnce makes a new thread-safe WriteOnce. +// +// If `initial` is non-nil then that value is Set at creation time. +// +// If a `Get` is waiting soon after `doneCh` becomes selectable (which +// never happens for the nil channel) then `Set(doneVal)` effectively +// happens at that time. +func NewWriteOnce(initial interface{}, doneCh <-chan struct{}, doneVal interface{}) WriteOnce { + p := &promise{ + doneCh: doneCh, + doneVal: doneVal, + setCh: make(chan struct{}), } + if initial != nil { + p.Set(initial) + } + return p } func (p *promise) Get() interface{} { - if !p.isSet { - p.waitingCount++ - p.activeCounter.Add(-1) - p.cond.Wait() + select { + case <-p.setCh: + case <-p.doneCh: + p.Set(p.doneVal) } return p.value } -func (p *promise) IsSet() bool { - return p.isSet -} - func (p *promise) Set(value interface{}) bool { - if p.isSet { - return false - } - p.isSet = true - p.value = value - if p.waitingCount > 0 { - p.activeCounter.Add(p.waitingCount) - p.waitingCount = 0 - p.cond.Broadcast() - } - return true + var ans bool + p.onceler.Do(func() { + p.value = value + close(p.setCh) + ans = true + }) + return ans } diff --git a/pkg/util/flowcontrol/fairqueuing/promise/promise_test.go b/pkg/util/flowcontrol/fairqueuing/promise/promise_test.go index 83f182c3e..e3864c2df 100644 --- a/pkg/util/flowcontrol/fairqueuing/promise/promise_test.go +++ b/pkg/util/flowcontrol/fairqueuing/promise/promise_test.go @@ -17,105 +17,103 @@ limitations under the License. package promise import ( - "sync" - "sync/atomic" + "context" "testing" "time" - testclock "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/eventclock" + "k8s.io/apimachinery/pkg/util/wait" ) -func TestLockingWriteOnce(t *testing.T) { +func TestWriteOnceSet(t *testing.T) { + oldTime := time.Now() + cval := &oldTime + ctx, cancel := context.WithCancel(context.Background()) + wr := NewWriteOnce(nil, ctx.Done(), cval) + gots := make(chan interface{}) + goGetExpectNotYet(t, wr, gots, "Set") now := time.Now() - clock, counter := testclock.NewFake(now, 0, nil) - var lock sync.Mutex - wr := NewWriteOnce(&lock, counter) - var gots int32 - var got atomic.Value - counter.Add(1) - go func() { - lock.Lock() - defer lock.Unlock() - - got.Store(wr.Get()) - atomic.AddInt32(&gots, 1) - counter.Add(-1) - }() - clock.Run(nil) - time.Sleep(time.Second) - if atomic.LoadInt32(&gots) != 0 { - t.Error("Get returned before Set") - } - func() { - lock.Lock() - defer lock.Unlock() - if wr.IsSet() { - t.Error("IsSet before Set") - } - }() aval := &now - func() { - lock.Lock() - defer lock.Unlock() - if !wr.Set(aval) { - t.Error("Set() returned false") - } - }() - clock.Run(nil) - time.Sleep(time.Second) - if atomic.LoadInt32(&gots) != 1 { - t.Error("Get did not return after Set") + if !wr.Set(aval) { + t.Error("Set() returned false") } - if got.Load() != aval { - t.Error("Get did not return what was Set") - } - func() { - lock.Lock() - defer lock.Unlock() - if !wr.IsSet() { - t.Error("IsSet()==false after Set") - } - }() - counter.Add(1) - go func() { - lock.Lock() - defer lock.Unlock() - - got.Store(wr.Get()) - atomic.AddInt32(&gots, 1) - counter.Add(-1) - }() - clock.Run(nil) - time.Sleep(time.Second) - if atomic.LoadInt32(&gots) != 2 { - t.Error("Second Get did not return immediately") - } - if got.Load() != aval { - t.Error("Second Get did not return what was Set") - } - func() { - lock.Lock() - defer lock.Unlock() - if !wr.IsSet() { - t.Error("IsSet()==false after second Get") - } - }() + expectGotValue(t, gots, aval) + goGetAndExpect(t, wr, gots, aval) later := time.Now() bval := &later - func() { - lock.Lock() - defer lock.Unlock() - if wr.Set(bval) { - t.Error("second Set() returned true") - } - }() - func() { - lock.Lock() - defer lock.Unlock() - if !wr.IsSet() { - t.Error("IsSet() returned false after second set") - } else if wr.Get() != aval { - t.Error("Get() after second Set returned wrong value") - } - }() + if wr.Set(bval) { + t.Error("second Set() returned true") + } + goGetAndExpect(t, wr, gots, aval) + cancel() + time.Sleep(time.Second) // give it a chance to misbehave + goGetAndExpect(t, wr, gots, aval) +} + +func TestWriteOnceCancel(t *testing.T) { + oldTime := time.Now() + cval := &oldTime + ctx, cancel := context.WithCancel(context.Background()) + wr := NewWriteOnce(nil, ctx.Done(), cval) + gots := make(chan interface{}) + goGetExpectNotYet(t, wr, gots, "cancel") + cancel() + expectGotValue(t, gots, cval) + goGetAndExpect(t, wr, gots, cval) + later := time.Now() + bval := &later + if wr.Set(bval) { + t.Error("Set() after cancel returned true") + } + goGetAndExpect(t, wr, gots, cval) +} + +func TestWriteOnceInitial(t *testing.T) { + oldTime := time.Now() + cval := &oldTime + ctx, cancel := context.WithCancel(context.Background()) + now := time.Now() + aval := &now + wr := NewWriteOnce(aval, ctx.Done(), cval) + gots := make(chan interface{}) + goGetAndExpect(t, wr, gots, aval) + later := time.Now() + bval := &later + if wr.Set(bval) { + t.Error("Set of initialized promise returned true") + } + goGetAndExpect(t, wr, gots, aval) + cancel() + time.Sleep(time.Second) // give it a chance to misbehave + goGetAndExpect(t, wr, gots, aval) +} + +func goGetExpectNotYet(t *testing.T, wr WriteOnce, gots chan interface{}, trigger string) { + go func() { + gots <- wr.Get() + }() + select { + case <-gots: + t.Errorf("Get returned before %s", trigger) + case <-time.After(time.Second): + t.Log("Good: Get did not return yet") + } +} + +func goGetAndExpect(t *testing.T, wr WriteOnce, gots chan interface{}, expected interface{}) { + go func() { + gots <- wr.Get() + }() + expectGotValue(t, gots, expected) +} + +func expectGotValue(t *testing.T, gots <-chan interface{}, expected interface{}) { + select { + case gotVal := <-gots: + t.Logf("Got %v", gotVal) + if gotVal != expected { + t.Errorf("Get returned %v, expected: %v", gotVal, expected) + } + case <-time.After(wait.ForeverTestTimeout): + t.Error("Get did not return") + } } diff --git a/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 6b9937a3b..0738c1fde 100644 --- a/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -23,8 +23,6 @@ import ( "sync" "time" - "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apiserver/pkg/util/flowcontrol/counter" "k8s.io/apiserver/pkg/util/flowcontrol/debug" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/eventclock" @@ -45,10 +43,18 @@ const nsTimeFmt = "2006-01-02 15:04:05.000000000" // queueSetFactory implements the QueueSetFactory interface // queueSetFactory makes QueueSet objects. type queueSetFactory struct { - counter counter.GoRoutineCounter - clock eventclock.Interface + clock eventclock.Interface + promiseFactoryFactory promiseFactoryFactory } +// promiseFactory returns a WriteOnce +// - whose Set method is invoked with the queueSet locked, and +// - whose Get method is invoked with the queueSet not locked. +type promiseFactory func(initial interface{}, doneCh <-chan struct{}, doneVal interface{}) promise.WriteOnce + +// promiseFactoryFactory returns the promiseFactory to use for the given queueSet +type promiseFactoryFactory func(*queueSet) promiseFactory + // `*queueSetCompleter` implements QueueSetCompleter. Exactly one of // the fields `factory` and `theSet` is non-nil. type queueSetCompleter struct { @@ -61,8 +67,8 @@ type queueSetCompleter struct { // queueSet implements the Fair Queuing for Server Requests technique // described in this package's doc, and a pointer to one implements -// the QueueSet interface. The clock, GoRoutineCounter, and estimated -// service time should not be changed; the fields listed after the +// the QueueSet interface. The fields listed before the lock +// should not be changed; the fields listed after the // lock must be accessed only while holding the lock. The methods of // this type follow the naming convention that the suffix "Locked" // means the caller must hold the lock; for a method whose name does @@ -70,10 +76,11 @@ type queueSetCompleter struct { // locking. type queueSet struct { clock eventclock.Interface - counter counter.GoRoutineCounter estimatedServiceTime float64 obsPair metrics.TimedObserverPair + promiseFactory promiseFactory + lock sync.Mutex // qCfg holds the current queuing configuration. Its @@ -119,10 +126,15 @@ type queueSet struct { } // NewQueueSetFactory creates a new QueueSetFactory object -func NewQueueSetFactory(c eventclock.Interface, counter counter.GoRoutineCounter) fq.QueueSetFactory { +func NewQueueSetFactory(c eventclock.Interface) fq.QueueSetFactory { + return newTestableQueueSetFactory(c, ordinaryPromiseFactoryFactory) +} + +// newTestableQueueSetFactory creates a new QueueSetFactory object with the given promiseFactoryFactory +func newTestableQueueSetFactory(c eventclock.Interface, promiseFactoryFactory promiseFactoryFactory) fq.QueueSetFactory { return &queueSetFactory{ - counter: counter, - clock: c, + clock: c, + promiseFactoryFactory: promiseFactoryFactory, } } @@ -157,13 +169,13 @@ func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet { if qs == nil { qs = &queueSet{ clock: qsc.factory.clock, - counter: qsc.factory.counter, estimatedServiceTime: 60, obsPair: qsc.obsPair, qCfg: qsc.qCfg, virtualTime: 0, lastRealTime: qsc.factory.clock.Now(), } + qs.promiseFactory = qsc.factory.promiseFactoryFactory(qs) } qs.setConfiguration(qsc.qCfg, qsc.dealer, dCfg) return qs @@ -240,6 +252,8 @@ const ( // executing at each point where there is a change in that quantity, // because the metrics --- and only the metrics --- track that // quantity per FlowSchema. +// The queueSet's promiseFactory is invoked once if the returns Request is non-nil, +// not invoked if the Request is nil. func (qs *queueSet) StartRequest(ctx context.Context, workEstimate *fqrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) { qs.lockAndSyncTime() defer qs.lock.Unlock() @@ -286,39 +300,16 @@ func (qs *queueSet) StartRequest(ctx context.Context, workEstimate *fqrequest.Wo // request from that queue. qs.dispatchAsMuchAsPossibleLocked() - // ======================================================================== - // Step 3: - - // Set up a relay from the context's Done channel to the world - // of well-counted goroutines. We Are Told that every - // request's context's Done channel gets closed by the time - // the request is done being processed. - doneCh := ctx.Done() - - // Retrieve the queueset configuration name while we have the lock - // and use it in the goroutine below. - configName := qs.qCfg.Name - - if doneCh != nil { - qs.preCreateOrUnblockGoroutine() - go func() { - defer runtime.HandleCrash() - qs.goroutineDoneOrBlocked() - <-doneCh - // Whatever goroutine unblocked the preceding receive MUST - // have already either (a) incremented qs.counter or (b) - // known that said counter is not actually counting or (c) - // known that the count does not need to be accurate. - // BTW, the count only needs to be accurate in a test that - // uses FakeEventClock::Run(). - klog.V(6).Infof("QS(%s): Context of request %q %#+v %#+v is Done", configName, fsName, descr1, descr2) - qs.cancelWait(req) - qs.goroutineDoneOrBlocked() - }() - } return req, false } +// ordinaryPromiseFactoryFactory is the promiseFactoryFactory that +// a queueSetFactory would ordinarily use. +// Test code might use something different. +func ordinaryPromiseFactoryFactory(qs *queueSet) promiseFactory { + return promise.NewWriteOnce +} + // Seats returns the number of seats this request requires. func (req *request) Seats() int { return int(req.workEstimate.Seats) @@ -348,41 +339,43 @@ func (req *request) Finish(execFn func()) bool { func (req *request) wait() (bool, bool) { qs := req.qs - qs.lock.Lock() + + // ======================================================================== + // Step 3: + // The final step is to wait on a decision from + // somewhere and then act on it. + decisionAny := req.decision.Get() + qs.lockAndSyncTime() defer qs.lock.Unlock() if req.waitStarted { // This can not happen, because the client is forbidden to // call Wait twice on the same request - panic(fmt.Sprintf("Multiple calls to the Wait method, QueueSet=%s, startTime=%s, descr1=%#+v, descr2=%#+v", req.qs.qCfg.Name, req.startTime, req.descr1, req.descr2)) + klog.Errorf("Duplicate call to the Wait method! Immediately returning execute=false. QueueSet=%s, startTime=%s, descr1=%#+v, descr2=%#+v", req.qs.qCfg.Name, req.startTime, req.descr1, req.descr2) + return false, qs.isIdleLocked() } req.waitStarted = true - - // ======================================================================== - // Step 4: - // The final step is to wait on a decision from - // somewhere and then act on it. - decisionAny := req.decision.Get() - qs.syncTimeLocked() - decision, isDecision := decisionAny.(requestDecision) - if !isDecision { - panic(fmt.Sprintf("QS(%s): Impossible decision %#+v (of type %T) for request %#+v %#+v", qs.qCfg.Name, decisionAny, decisionAny, req.descr1, req.descr2)) - } - switch decision { + switch decisionAny { case decisionReject: klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.qCfg.Name, req.descr1, req.descr2) metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "time-out") return false, qs.isIdleLocked() case decisionCancel: - // TODO(aaron-prindle) add metrics for this case - klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.qCfg.Name, req.descr1, req.descr2) - return false, qs.isIdleLocked() case decisionExecute: klog.V(5).Infof("QS(%s): Dispatching request %#+v %#+v from its queue", qs.qCfg.Name, req.descr1, req.descr2) return true, false default: // This can not happen, all possible values are handled above - panic(decision) + klog.Errorf("QS(%s): Impossible decision (type %T, value %#+v) for request %#+v %#+v! Treating as cancel", qs.qCfg.Name, decisionAny, decisionAny, req.descr1, req.descr2) } + // TODO(aaron-prindle) add metrics for this case + klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.qCfg.Name, req.descr1, req.descr2) + // remove the request from the queue as it has timed out + req.removeFromQueueFn() + qs.totRequestsWaiting-- + metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) + req.NoteQueued(false) + qs.obsPair.RequestsWaiting.Add(-1) + return false, qs.isIdleLocked() } func (qs *queueSet) IsIdle() bool { @@ -458,7 +451,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte fsName: fsName, flowDistinguisher: flowDistinguisher, ctx: ctx, - decision: promise.NewWriteOnce(&qs.lock, qs.counter), + decision: qs.promiseFactory(nil, ctx.Done(), decisionCancel), arrivalTime: qs.clock.Now(), queue: queue, descr1: descr1, @@ -593,13 +586,12 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f flowDistinguisher: flowDistinguisher, ctx: ctx, startTime: now, - decision: promise.NewWriteOnce(&qs.lock, qs.counter), + decision: qs.promiseFactory(decisionExecute, ctx.Done(), decisionCancel), arrivalTime: now, descr1: descr1, descr2: descr2, workEstimate: *workEstimate, } - req.decision.Set(decisionExecute) qs.totRequestsExecuting++ qs.totSeatsInUse += req.Seats() metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1) @@ -652,26 +644,6 @@ func (qs *queueSet) dispatchLocked() bool { return ok } -// cancelWait ensures the request is not waiting. This is only -// applicable to a request that has been assigned to a queue. -func (qs *queueSet) cancelWait(req *request) { - qs.lock.Lock() - defer qs.lock.Unlock() - if req.decision.IsSet() { - // The request has already been removed from the queue - // and so we consider its wait to be over. - return - } - req.decision.Set(decisionCancel) - - // remove the request from the queue as it has timed out - req.removeFromQueueFn() - qs.totRequestsWaiting-- - metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) - req.NoteQueued(false) - qs.obsPair.RequestsWaiting.Add(-1) -} - // canAccommodateSeatsLocked returns true if this queueSet has enough // seats available to accommodate a request with the given number of seats, // otherwise it returns false. @@ -856,21 +828,6 @@ func removeQueueAndUpdateIndexes(queues []*queue, index int) []*queue { return keptQueues } -// preCreateOrUnblockGoroutine needs to be called before creating a -// goroutine associated with this queueSet or unblocking a blocked -// one, to properly update the accounting used in testing. -func (qs *queueSet) preCreateOrUnblockGoroutine() { - qs.counter.Add(1) -} - -// goroutineDoneOrBlocked needs to be called at the end of every -// goroutine associated with this queueSet or when such a goroutine is -// about to wait on some other goroutine to do something; this is to -// properly update the accounting used in testing. -func (qs *queueSet) goroutineDoneOrBlocked() { - qs.counter.Add(-1) -} - func (qs *queueSet) UpdateObservations() { qs.obsPair.RequestsWaiting.Add(0) qs.obsPair.RequestsExecuting.Add(0) diff --git a/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index 44b36d222..485565f88 100644 --- a/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -21,8 +21,11 @@ import ( "errors" "fmt" "math" + "os" "reflect" "sort" + "strings" + "sync" "sync/atomic" "testing" "time" @@ -31,8 +34,10 @@ import ( "k8s.io/apiserver/pkg/util/flowcontrol/counter" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" + "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise" test "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing" testeventclock "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/eventclock" + testpromise "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/promise" "k8s.io/apiserver/pkg/util/flowcontrol/metrics" fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" "k8s.io/klog/v2" @@ -307,11 +312,11 @@ func (uss *uniformScenarioState) finalReview() { expectedRejects := "" for i := range uss.clients { fsName := fmt.Sprintf("client%d", i) - if atomic.AddInt32(&uss.executions[i], 0) > 0 { + if atomic.LoadInt32(&uss.executions[i]) > 0 { uss.expectedExecuting = uss.expectedExecuting + fmt.Sprintf(` apiserver_flowcontrol_current_executing_requests{flow_schema=%q,priority_level=%q} 0%s`, fsName, uss.name, "\n") uss.expectedConcurrencyInUse = uss.expectedConcurrencyInUse + fmt.Sprintf(` apiserver_flowcontrol_request_concurrency_in_use{flow_schema=%q,priority_level=%q} 0%s`, fsName, uss.name, "\n") } - if atomic.AddInt32(&uss.rejects[i], 0) > 0 { + if atomic.LoadInt32(&uss.rejects[i]) > 0 { expectedRejects = expectedRejects + fmt.Sprintf(` apiserver_flowcontrol_rejected_requests_total{flow_schema=%q,priority_level=%q,reason=%q} %d%s`, fsName, uss.name, uss.rejectReason, uss.rejects[i], "\n") } } @@ -353,8 +358,9 @@ func (uss *uniformScenarioState) finalReview() { } } -func init() { +func TestMain(m *testing.M) { klog.InitFlags(nil) + os.Exit(m.Run()) } // TestNoRestraint tests whether the no-restraint factory gives every client what it asks for @@ -388,7 +394,7 @@ func TestUniformFlowsHandSize1(t *testing.T) { now := time.Now() clk, counter := testeventclock.NewFake(now, 0, nil) - qsf := NewQueueSetFactory(clk, counter) + qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter)) qCfg := fq.QueuingConfig{ Name: "TestUniformFlowsHandSize1", DesiredNumQueues: 9, @@ -425,7 +431,7 @@ func TestUniformFlowsHandSize3(t *testing.T) { now := time.Now() clk, counter := testeventclock.NewFake(now, 0, nil) - qsf := NewQueueSetFactory(clk, counter) + qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter)) qCfg := fq.QueuingConfig{ Name: "TestUniformFlowsHandSize3", DesiredNumQueues: 8, @@ -461,7 +467,7 @@ func TestDifferentFlowsExpectEqual(t *testing.T) { now := time.Now() clk, counter := testeventclock.NewFake(now, 0, nil) - qsf := NewQueueSetFactory(clk, counter) + qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter)) qCfg := fq.QueuingConfig{ Name: "DiffFlowsExpectEqual", DesiredNumQueues: 9, @@ -498,7 +504,7 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) { now := time.Now() clk, counter := testeventclock.NewFake(now, 0, nil) - qsf := NewQueueSetFactory(clk, counter) + qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter)) qCfg := fq.QueuingConfig{ Name: "DiffFlowsExpectUnequal", DesiredNumQueues: 9, @@ -535,7 +541,7 @@ func TestWindup(t *testing.T) { now := time.Now() clk, counter := testeventclock.NewFake(now, 0, nil) - qsf := NewQueueSetFactory(clk, counter) + qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter)) qCfg := fq.QueuingConfig{ Name: "TestWindup", DesiredNumQueues: 9, @@ -571,7 +577,7 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) { now := time.Now() clk, counter := testeventclock.NewFake(now, 0, nil) - qsf := NewQueueSetFactory(clk, counter) + qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter)) qCfg := fq.QueuingConfig{ Name: "TestDifferentFlowsWithoutQueuing", DesiredNumQueues: 0, @@ -604,7 +610,7 @@ func TestTimeout(t *testing.T) { now := time.Now() clk, counter := testeventclock.NewFake(now, 0, nil) - qsf := NewQueueSetFactory(clk, counter) + qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter)) qCfg := fq.QueuingConfig{ Name: "TestTimeout", DesiredNumQueues: 128, @@ -635,12 +641,27 @@ func TestTimeout(t *testing.T) { }.exercise(t) } +// TestContextCancel tests cancellation of a request's context. +// The outline is: +// 1. Use a concurrency limit of 1. +// 2. Start request 1. +// 3. Use a fake clock for the following logic, to insulate from scheduler noise. +// 4. The exec fn of request 1 starts request 2, which should wait +// in its queue. +// 5. The exec fn of request 1 also forks a goroutine that waits 1 second +// and then cancels the context of request 2. +// 6. The exec fn of request 1, if StartRequest 2 returns a req2 (which is the normal case), +// calls `req2.Finish`, which is expected to return after the context cancel. +// 7. The queueset interface allows StartRequest 2 to return `nil` in this situation, +// if the scheduler gets the cancel done before StartRequest finishes; +// the test handles this without regard to whether the implementation will ever do that. +// 8. Check that the above took exactly 1 second. func TestContextCancel(t *testing.T) { metrics.Register() metrics.Reset() now := time.Now() clk, counter := testeventclock.NewFake(now, 0, nil) - qsf := NewQueueSetFactory(clk, counter) + qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter)) qCfg := fq.QueuingConfig{ Name: "TestContextCancel", DesiredNumQueues: 11, @@ -653,59 +674,80 @@ func TestContextCancel(t *testing.T) { t.Fatal(err) } qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1}) - counter.Add(1) // account for the goroutine running this test + counter.Add(1) // account for main activity of the goroutine running this test ctx1 := context.Background() - b2i := map[bool]int{false: 0, true: 1} - var qnc [2][2]int32 - req1, _ := qs.StartRequest(ctx1, &fcrequest.WorkEstimate{Seats: 1}, 1, "", "fs1", "test", "one", func(inQueue bool) { atomic.AddInt32(&qnc[0][b2i[inQueue]], 1) }) + pZero := func() *int32 { var zero int32; return &zero } + // counts of calls to the QueueNoteFns + queueNoteCounts := map[int]map[bool]*int32{ + 1: {false: pZero(), true: pZero()}, + 2: {false: pZero(), true: pZero()}, + } + queueNoteFn := func(fn int) func(inQueue bool) { + return func(inQueue bool) { atomic.AddInt32(queueNoteCounts[fn][inQueue], 1) } + } + fatalErrs := []string{} + var errsLock sync.Mutex + expectQNCount := func(fn int, inQueue bool, expect int32) { + if a := atomic.LoadInt32(queueNoteCounts[fn][inQueue]); a != expect { + errsLock.Lock() + defer errsLock.Unlock() + fatalErrs = append(fatalErrs, fmt.Sprintf("Got %d calls to queueNoteFn%d(%v), expected %d", a, fn, inQueue, expect)) + } + } + expectQNCounts := func(fn int, expectF, expectT int32) { + expectQNCount(fn, false, expectF) + expectQNCount(fn, true, expectT) + } + req1, _ := qs.StartRequest(ctx1, &fcrequest.WorkEstimate{Seats: 1}, 1, "", "fs1", "test", "one", queueNoteFn(1)) if req1 == nil { t.Error("Request rejected") return } - if a := atomic.AddInt32(&qnc[0][0], 0); a != 1 { - t.Errorf("Got %d calls to queueNoteFn1(false), expected 1", a) + expectQNCounts(1, 1, 1) + var executed1, idle1 bool + counter.Add(1) // account for the following goroutine + go func() { + defer counter.Add(-1) // account completion of this goroutine + idle1 = req1.Finish(func() { + executed1 = true + ctx2, cancel2 := context.WithCancel(context.Background()) + tBefore := clk.Now() + counter.Add(1) // account for the following goroutine + go func() { + defer counter.Add(-1) // account completion of this goroutine + clk.Sleep(time.Second) + expectQNCounts(2, 0, 1) + // account for unblocking the goroutine that waits on cancelation + counter.Add(1) + cancel2() + }() + req2, idle2a := qs.StartRequest(ctx2, &fcrequest.WorkEstimate{Seats: 1}, 2, "", "fs2", "test", "two", queueNoteFn(2)) + if idle2a { + t.Error("2nd StartRequest returned idle") + } + if req2 != nil { + idle2b := req2.Finish(func() { + t.Error("Executing req2") + }) + if idle2b { + t.Error("2nd Finish returned idle") + } + expectQNCounts(2, 1, 1) + } + tAfter := clk.Now() + dt := tAfter.Sub(tBefore) + if dt != time.Second { + t.Errorf("Unexpected: dt=%d", dt) + } + }) + }() + counter.Add(-1) // completion of main activity of goroutine running this test + clk.Run(nil) + errsLock.Lock() + defer errsLock.Unlock() + if len(fatalErrs) > 0 { + t.Error(strings.Join(fatalErrs, "; ")) } - if a := atomic.AddInt32(&qnc[0][1], 0); a != 1 { - t.Errorf("Got %d calls to queueNoteFn1(true), expected 1", a) - } - var executed1 bool - idle1 := req1.Finish(func() { - executed1 = true - ctx2, cancel2 := context.WithCancel(context.Background()) - tBefore := time.Now() - go func() { - time.Sleep(time.Second) - if a := atomic.AddInt32(&qnc[1][0], 0); a != 0 { - t.Errorf("Got %d calls to queueNoteFn2(false), expected 0", a) - } - if a := atomic.AddInt32(&qnc[1][1], 0); a != 1 { - t.Errorf("Got %d calls to queueNoteFn2(true), expected 1", a) - } - // account for unblocking the goroutine that waits on cancelation - counter.Add(1) - cancel2() - }() - req2, idle2a := qs.StartRequest(ctx2, &fcrequest.WorkEstimate{Seats: 1}, 2, "", "fs2", "test", "two", func(inQueue bool) { atomic.AddInt32(&qnc[1][b2i[inQueue]], 1) }) - if idle2a { - t.Error("2nd StartRequest returned idle") - } - if req2 != nil { - idle2b := req2.Finish(func() { - t.Error("Executing req2") - }) - if idle2b { - t.Error("2nd Finish returned idle") - } - if a := atomic.AddInt32(&qnc[1][0], 0); a != 1 { - t.Errorf("Got %d calls to queueNoteFn2(false), expected 1", a) - } - } - tAfter := time.Now() - dt := tAfter.Sub(tBefore) - if dt < time.Second || dt > 2*time.Second { - t.Errorf("Unexpected: dt=%d", dt) - } - }) if !executed1 { t.Errorf("Unexpected: executed1=%v", executed1) } @@ -714,12 +756,20 @@ func TestContextCancel(t *testing.T) { } } +func countingPromiseFactoryFactory(activeCounter counter.GoRoutineCounter) promiseFactoryFactory { + return func(qs *queueSet) promiseFactory { + return func(initial interface{}, doneCh <-chan struct{}, doneVal interface{}) promise.WriteOnce { + return testpromise.NewCountingWriteOnce(activeCounter, &qs.lock, initial, doneCh, doneVal) + } + } +} + func TestTotalRequestsExecutingWithPanic(t *testing.T) { metrics.Register() metrics.Reset() now := time.Now() clk, counter := testeventclock.NewFake(now, 0, nil) - qsf := NewQueueSetFactory(clk, counter) + qsf := newTestableQueueSetFactory(clk, countingPromiseFactoryFactory(counter)) qCfg := fq.QueuingConfig{ Name: "TestTotalRequestsExecutingWithPanic", DesiredNumQueues: 0, diff --git a/pkg/util/flowcontrol/fairqueuing/queueset/types.go b/pkg/util/flowcontrol/fairqueuing/queueset/types.go index cd954d466..5bfe85c76 100644 --- a/pkg/util/flowcontrol/fairqueuing/queueset/types.go +++ b/pkg/util/flowcontrol/fairqueuing/queueset/types.go @@ -50,11 +50,10 @@ type request struct { // decision gets set to a `requestDecision` indicating what to do // with this request. It gets set exactly once, when the request // is removed from its queue. The value will be decisionReject, - // decisionCancel, or decisionExecute; decisionTryAnother never - // appears here. + // decisionCancel, or decisionExecute. // - // The field is NOT thread-safe and should be protected by the - // queueset's lock. + // decision.Set is called with the queueSet locked. + // decision.Get is called without the queueSet locked. decision promise.WriteOnce // arrivalTime is the real time when the request entered this system diff --git a/pkg/util/flowcontrol/fairqueuing/testing/promise/counting.go b/pkg/util/flowcontrol/fairqueuing/testing/promise/counting.go new file mode 100644 index 000000000..7a47b02ea --- /dev/null +++ b/pkg/util/flowcontrol/fairqueuing/testing/promise/counting.go @@ -0,0 +1,105 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package promise + +import ( + "sync" + + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apiserver/pkg/util/flowcontrol/counter" + promiseifc "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise" +) + +// countingPromise implements the WriteOnce interface. +// This implementation is based on a condition variable. +// This implementation tracks active goroutines: +// the given counter is decremented for a goroutine waiting for this +// varible to be set and incremented when such a goroutine is +// unblocked. +type countingPromise struct { + lock sync.Locker + cond sync.Cond + activeCounter counter.GoRoutineCounter // counter of active goroutines + waitingCount int // number of goroutines idle due to this being unset + isSet bool + value interface{} +} + +var _ promiseifc.WriteOnce = &countingPromise{} + +// NewCountingWriteOnce creates a WriteOnce that uses locking and counts goroutine activity. +// +// The final three arguments are like those for a regular WriteOnce factory: +// - an optional initial value, +// - an optional "done" channel, +// - the value that is Set after the "done" channel becomes selectable. +// Note that for this implementation, the reaction to `doneCh` +// becoming selectable does not wait for a Get. +// If `doneCh != nil` then the caller promises to close it reasonably promptly +// (to the degree allowed by the Go runtime scheduler), and increment the +// goroutine counter before that. +// The WriteOnce's Get method must be called without the lock held. +// The WriteOnce's Set method must be called with the lock held. +func NewCountingWriteOnce(activeCounter counter.GoRoutineCounter, lock sync.Locker, initial interface{}, doneCh <-chan struct{}, doneVal interface{}) promiseifc.WriteOnce { + p := &countingPromise{ + lock: lock, + cond: *sync.NewCond(lock), + activeCounter: activeCounter, + isSet: initial != nil, + value: initial, + } + if doneCh != nil { + activeCounter.Add(1) // count start of the following goroutine + go func() { + defer activeCounter.Add(-1) // count completion of this goroutine + defer runtime.HandleCrash() + activeCounter.Add(-1) // count suspension for channel receive + <-doneCh + // Whatever goroutine unblocked the preceding receive MUST + // have already accounted for this activation. + lock.Lock() + defer lock.Unlock() + p.Set(doneVal) + }() + } + return p +} + +func (p *countingPromise) Get() interface{} { + p.lock.Lock() + defer p.lock.Unlock() + if !p.isSet { + p.waitingCount++ + p.activeCounter.Add(-1) + p.cond.Wait() + } + return p.value +} + +func (p *countingPromise) Set(value interface{}) bool { + if p.isSet { + return false + } + p.isSet = true + p.value = value + if p.waitingCount > 0 { + p.activeCounter.Add(p.waitingCount) + p.waitingCount = 0 + p.cond.Broadcast() + } + return true +} diff --git a/pkg/util/flowcontrol/fairqueuing/testing/promise/counting_test.go b/pkg/util/flowcontrol/fairqueuing/testing/promise/counting_test.go new file mode 100644 index 000000000..28272ae7d --- /dev/null +++ b/pkg/util/flowcontrol/fairqueuing/testing/promise/counting_test.go @@ -0,0 +1,164 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package promise + +import ( + "context" + "os" + "sync" + "testing" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/util/flowcontrol/counter" + "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise" + testeventclock "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/eventclock" + "k8s.io/klog/v2" +) + +func TestMain(m *testing.M) { + klog.InitFlags(nil) + os.Exit(m.Run()) +} + +func TestCountingWriteOnceSet(t *testing.T) { + oldTime := time.Now() + cval := &oldTime + doneCh := make(chan struct{}) + now := time.Now() + clock, counter := testeventclock.NewFake(now, 0, nil) + var lock sync.Mutex + wr := NewCountingWriteOnce(counter, &lock, nil, doneCh, cval) + gots := make(chan interface{}, 1) + goGetExpectNotYet(t, clock, counter, wr, gots, "Set") + aval := &now + func() { + lock.Lock() + defer lock.Unlock() + if !wr.Set(aval) { + t.Error("Set() returned false") + } + }() + clock.Run(nil) + expectGotValue(t, gots, aval) + goGetAndExpect(t, clock, counter, wr, gots, aval) + later := time.Now() + bval := &later + func() { + lock.Lock() + defer lock.Unlock() + if wr.Set(bval) { + t.Error("second Set() returned true") + } + }() + goGetAndExpect(t, clock, counter, wr, gots, aval) + counter.Add(1) // account for unblocking the receive on doneCh + close(doneCh) + time.Sleep(time.Second) // give it a chance to misbehave + goGetAndExpect(t, clock, counter, wr, gots, aval) +} +func TestCountingWriteOnceCancel(t *testing.T) { + oldTime := time.Now() + cval := &oldTime + clock, counter := testeventclock.NewFake(oldTime, 0, nil) + ctx, cancel := context.WithCancel(context.Background()) + var lock sync.Mutex + wr := NewCountingWriteOnce(counter, &lock, nil, ctx.Done(), cval) + gots := make(chan interface{}, 1) + goGetExpectNotYet(t, clock, counter, wr, gots, "cancel") + counter.Add(1) // account for unblocking the receive on doneCh + cancel() + clock.Run(nil) + expectGotValue(t, gots, cval) + goGetAndExpect(t, clock, counter, wr, gots, cval) + later := time.Now() + bval := &later + func() { + lock.Lock() + defer lock.Unlock() + if wr.Set(bval) { + t.Error("Set() after cancel returned true") + } + }() + goGetAndExpect(t, clock, counter, wr, gots, cval) +} + +func TestCountingWriteOnceInitial(t *testing.T) { + oldTime := time.Now() + cval := &oldTime + clock, counter := testeventclock.NewFake(oldTime, 0, nil) + ctx, cancel := context.WithCancel(context.Background()) + var lock sync.Mutex + now := time.Now() + aval := &now + wr := NewCountingWriteOnce(counter, &lock, aval, ctx.Done(), cval) + gots := make(chan interface{}, 1) + goGetAndExpect(t, clock, counter, wr, gots, aval) + goGetAndExpect(t, clock, counter, wr, gots, aval) // check that a set value stays set + later := time.Now() + bval := &later + func() { + lock.Lock() + defer lock.Unlock() + if wr.Set(bval) { + t.Error("Set of initialized promise returned true") + } + }() + goGetAndExpect(t, clock, counter, wr, gots, aval) + counter.Add(1) // account for unblocking receive on doneCh + cancel() + time.Sleep(time.Second) // give it a chance to misbehave + goGetAndExpect(t, clock, counter, wr, gots, aval) +} + +func goGetExpectNotYet(t *testing.T, clk *testeventclock.Fake, grc counter.GoRoutineCounter, wr promise.WriteOnce, gots chan interface{}, trigger string) { + grc.Add(1) // count the following goroutine + go func() { + defer grc.Add(-1) // count completion of this goroutine + gots <- wr.Get() + }() + clk.Run(nil) + select { + case <-gots: + t.Errorf("Get returned before %s", trigger) + case <-time.After(time.Second): + t.Log("Good: Get did not return yet") + } + +} + +func goGetAndExpect(t *testing.T, clk *testeventclock.Fake, grc counter.GoRoutineCounter, wr promise.WriteOnce, gots chan interface{}, expected interface{}) { + grc.Add(1) + go func() { + defer grc.Add(-1) + gots <- wr.Get() + }() + clk.Run(nil) + expectGotValue(t, gots, expected) +} + +func expectGotValue(t *testing.T, gots <-chan interface{}, expected interface{}) { + select { + case gotVal := <-gots: + t.Logf("Got %v", gotVal) + if gotVal != expected { + t.Errorf("Get returned %v, expected: %v", gotVal, expected) + } + case <-time.After(wait.ForeverTestTimeout): + t.Error("Get did not return") + } +}