fairqueuing implementation with unit tests

Kubernetes-commit: 24065cf5be6bed995da7b7abb37ee78ff95230f0
This commit is contained in:
Aaron Prindle 2019-10-29 21:54:16 -07:00 committed by Kubernetes Publisher
parent 5ec070f50a
commit a222f282e1
12 changed files with 1696 additions and 13 deletions

20
go.mod
View File

@ -26,8 +26,9 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/hashicorp/golang-lru v0.5.1
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
github.com/pkg/errors v0.8.1 // indirect
github.com/pkg/errors v0.8.1
github.com/pquerna/cachecontrol v0.0.0-20171018203845-0dec1b30a021 // indirect
github.com/prometheus/client_golang v1.0.0
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90
github.com/sirupsen/logrus v1.4.2 // indirect
github.com/spf13/pflag v1.0.5
@ -42,10 +43,10 @@ require (
gopkg.in/square/go-jose.v2 v2.2.2
gopkg.in/yaml.v2 v2.2.4
gotest.tools v2.2.0+incompatible // indirect
k8s.io/api v0.0.0-20191114100036-40f4bbc2b486
k8s.io/apimachinery v0.0.0-20191114095528-3db02fd2eea7
k8s.io/client-go v0.0.0-20191114100703-1f4f5fa64a6c
k8s.io/component-base v0.0.0-20191114102135-42a5d5b2565c
k8s.io/api v0.0.0
k8s.io/apimachinery v0.0.0
k8s.io/client-go v0.0.0
k8s.io/component-base v0.0.0
k8s.io/klog v1.0.0
k8s.io/kube-openapi v0.0.0-20191107075043-30be4d16710a
k8s.io/utils v0.0.0-20191030222137-2b95a09bc58d
@ -56,8 +57,9 @@ require (
replace (
golang.org/x/sys => golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a
golang.org/x/tools => golang.org/x/tools v0.0.0-20190821162956-65e3620a7ae7
k8s.io/api => k8s.io/api v0.0.0-20191114100036-40f4bbc2b486
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20191114095528-3db02fd2eea7
k8s.io/client-go => k8s.io/client-go v0.0.0-20191114100703-1f4f5fa64a6c
k8s.io/component-base => k8s.io/component-base v0.0.0-20191114102135-42a5d5b2565c
k8s.io/api => ../api
k8s.io/apimachinery => ../apimachinery
k8s.io/apiserver => ../apiserver
k8s.io/client-go => ../client-go
k8s.io/component-base => ../component-base
)

5
go.sum
View File

@ -235,6 +235,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/testify v0.0.0-20151208002404-e3a8ff8ce365/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
@ -346,10 +347,6 @@ gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
k8s.io/api v0.0.0-20191114100036-40f4bbc2b486/go.mod h1:IM5ceavki8HjRhUlaRYP3oGw0J/hbXKiKiSqS5AR728=
k8s.io/apimachinery v0.0.0-20191114095528-3db02fd2eea7/go.mod h1:+6CX7hP4aLfX2sb91JYDMIp0VqDSog2kZu0BHe+lP+s=
k8s.io/client-go v0.0.0-20191114100703-1f4f5fa64a6c/go.mod h1:SI++Xl/YwtfdRxOuhN04ry6Hl5PyasXDGmBBZnzofBo=
k8s.io/component-base v0.0.0-20191114102135-42a5d5b2565c/go.mod h1:rwIfg3coOPWGYSmJnTp7yw1QVOB/ncA32pwgawNSR2Q=
k8s.io/gengo v0.0.0-20190128074634-0689ccc1d7d6/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=

View File

@ -0,0 +1,33 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package counter
// GoRoutineCounter keeps track of the number of active goroutines
// working on/for something. This is a utility that makes such code more
// testable. The code uses this utility to report the number of active
// goroutines to the test code, so that the test code can advance a fake
// clock when and only when the code being tested has finished all
// the work that is ready to do at the present time.
type GoRoutineCounter interface {
// Add adds the given delta to the count of active goroutines.
// Call Add(1) before forking a goroutine, Add(-1) at the end of that goroutine.
// Call Add(-1) just before waiting on something from another goroutine (e.g.,
// just before a `select`).
// Call Add(1) just before doing something that unblocks a goroutine that is
// waiting on that something.
Add(delta int)
}

View File

@ -0,0 +1,88 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fairqueuing
import (
"context"
"time"
)
// QueueSetFactory is used to create QueueSet objects.
type QueueSetFactory interface {
NewQueueSet(config QueueSetConfig) (QueueSet, error)
}
// QueueSet is the abstraction for the queuing and dispatching
// functionality of one non-exempt priority level. It covers the
// functionality described in the "Assignment to a Queue", "Queuing",
// and "Dispatching" sections of
// https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/20190228-priority-and-fairness.md
// . Some day we may have connections between priority levels, but
// today is not that day.
type QueueSet interface {
// SetConfiguration updates the configuration
SetConfiguration(QueueSetConfig) error
// Quiesce controls whether the QueueSet is operating normally or is quiescing.
// A quiescing QueueSet drains as normal but does not admit any
// new requests. Passing a non-nil handler means the system should
// be quiescing, a nil handler means the system should operate
// normally. A call to Wait while the system is quiescing
// will be rebuffed by returning tryAnother=true. If all the
// queues have no requests waiting nor executing while the system
// is quiescing then the handler will eventually be called with no
// locks held (even if the system becomes non-quiescing between the
// triggering state and the required call).
Quiesce(EmptyHandler)
// Wait uses the given hashValue as the source of entropy
// as it shuffle-shards a request into a queue and waits for
// a decision on what to do with that request. If tryAnother==true
// at return then the QueueSet has become undesirable and the client
// should try to find a different QueueSet to use; execute and
// afterExecution are irrelevant in this case. Otherwise, if execute
// then the client should start executing the request and, once the
// request finishes execution or is canceled, call afterExecution().
// Otherwise the client should not execute the
// request and afterExecution is irrelevant.
Wait(ctx context.Context, hashValue uint64) (tryAnother, execute bool, afterExecution func())
}
// QueueSetConfig defines the configuration of a QueueSet.
type QueueSetConfig struct {
// Name is used to identify a queue set, allowing for descriptive information about its intended use
Name string
// ConcurrencyLimit is the maximum number of requests of this QueueSet that may be executing at a time
ConcurrencyLimit int
// DesiredNumQueues is the number of queues that the API says should exist now
DesiredNumQueues int
// QueueLengthLimit is the maximum number of requests that may be waiting in a given queue at a time
QueueLengthLimit int
// HandSize is a parameter of shuffle sharding. Upon arrival of a request, a queue is chosen by randomly
// dealing a "hand" of this many queues and then picking one of minimum length.
HandSize int
// RequestWaitLimit is the maximum amount of time that a request may wait in a queue.
// If, by the end of that time, the request has not been dispatched then it is rejected.
RequestWaitLimit time.Duration
}
// EmptyHandler is used to notify the callee when all the queues
// of a QueueSet have been drained.
type EmptyHandler interface {
// HandleEmpty is called to deliver the notification
HandleEmpty()
}

View File

@ -0,0 +1,569 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queueset
import (
"context"
"math"
"sync"
"time"
"k8s.io/apimachinery/pkg/util/runtime"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
"k8s.io/apiserver/pkg/util/shufflesharding"
"k8s.io/klog"
)
// queueSetFactory implements the QueueSetFactory interface
// queueSetFactory makes QueueSet objects.
type queueSetFactory struct {
counter counter.GoRoutineCounter
clock clock.PassiveClock
}
// NewQueueSetFactory creates a new QueueSetFactory object
func NewQueueSetFactory(c clock.PassiveClock, counter counter.GoRoutineCounter) fq.QueueSetFactory {
return &queueSetFactory{
counter: counter,
clock: c,
}
}
// NewQueueSet creates a new QueueSet object
// There is a new QueueSet created for each priority level.
func (qsf queueSetFactory) NewQueueSet(config fq.QueueSetConfig) (fq.QueueSet, error) {
return newQueueSet(config, qsf.clock, qsf.counter)
}
// queueSet is a fair queuing implementation designed with three major differences:
// 1) dispatches requests to be served rather than requests to be transmitted
// 2) serves multiple requests at once
// 3) a request's service time is not known until it finishes
// implementation of:
// https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/20190228-priority-and-fairness.md
type queueSet struct {
lock sync.Mutex
config fq.QueueSetConfig
counter counter.GoRoutineCounter
clock clock.PassiveClock
queues []*fq.Queue
virtualTime float64
estimatedServiceTime float64
lastRealTime time.Time
robinIndex int
// numRequestsEnqueued is the number of requests currently enqueued
// (eg: incremeneted on Enqueue, decremented on Dequue)
numRequestsEnqueued int
emptyHandler fq.EmptyHandler
dealer *shufflesharding.Dealer
}
// initQueues is a helper method for initializing an array of n queues
func initQueues(n, baseIndex int) []*fq.Queue {
fqqueues := make([]*fq.Queue, n)
for i := 0; i < n; i++ {
fqqueues[i] = &fq.Queue{Index: baseIndex + i, Requests: make([]*fq.Request, 0)}
}
return fqqueues
}
// newQueueSet creates a new queueSet from passed in parameters
func newQueueSet(config fq.QueueSetConfig, c clock.PassiveClock, counter counter.GoRoutineCounter) (*queueSet, error) {
dealer, err := shufflesharding.NewDealer(config.DesiredNumQueues, config.HandSize)
if err != nil {
return nil, errors.Wrap(err, "shuffle sharding dealer creation failed")
}
fq := &queueSet{
config: config,
counter: counter,
queues: initQueues(config.DesiredNumQueues, 0),
clock: c,
virtualTime: 0,
lastRealTime: c.Now(),
dealer: dealer,
}
return fq, nil
}
// SetConfiguration is used to set the configuration for a queueSet
// update handling for when fields are updated is handled here as well -
// eg: if DesiredNum is increased, SetConfiguration reconciles by
// adding more queues.
func (qs *queueSet) SetConfiguration(config fq.QueueSetConfig) error {
qs.lockAndSyncTime()
defer qs.lock.Unlock()
dealer, err := shufflesharding.NewDealer(config.DesiredNumQueues, config.HandSize)
if err != nil {
return errors.Wrap(err, "shuffle sharding dealer creation failed")
}
// Adding queues is the only thing that requires immediate action
// Removing queues is handled by omitting indexes >DesiredNum from
// chooseQueueIndexLocked
numQueues := len(qs.queues)
if config.DesiredNumQueues > numQueues {
qs.queues = append(qs.queues,
initQueues(config.DesiredNumQueues-numQueues, len(qs.queues))...)
}
qs.config = config
qs.dealer = dealer
qs.dequeueWithChannelAsMuchAsPossible()
return nil
}
// Quiesce controls whether the QueueSet is operating normally or is quiescing.
// A quiescing QueueSet drains as normal but does not admit any
// new requests. Passing a non-nil handler means the system should
// be quiescing, a nil handler means the system should operate
// normally. A call to Wait while the system is quiescing
// will be rebuffed by returning tryAnother=true. If all the
// queues have no requests waiting nor executing while the system
// is quiescing then the handler will eventually be called with no
// locks held (even if the system becomes non-quiescing between the
// triggering state and the required call).
func (qs *queueSet) Quiesce(eh fq.EmptyHandler) {
qs.lock.Lock()
defer qs.lock.Unlock()
if eh == nil {
qs.emptyHandler = eh
return
}
// Here we check whether there are any requests queued or executing and
// if not then fork an invocation of the EmptyHandler.
qs.maybeForkEmptyHandlerLocked()
qs.emptyHandler = eh
}
// Wait uses the given hashValue as the source of entropy
// as it shuffle-shards a request into a queue and waits for
// a decision on what to do with that request. If tryAnother==true
// at return then the QueueSet has become undesirable and the client
// should try to find a different QueueSet to use; execute and
// afterExecution are irrelevant in this case. Otherwise, if execute
// then the client should start executing the request and, once the
// request finishes execution or is canceled, call afterExecution().
// Otherwise the client should not execute the
// request and afterExecution is irrelevant.
func (qs *queueSet) Wait(ctx context.Context, hashValue uint64) (tryAnother, execute bool, afterExecution func()) {
var req *fq.Request
shouldReturn, tryAnother, execute, afterExecution := func() (
shouldReturn, tryAnother, execute bool, afterExecution func()) {
qs.lockAndSyncTime()
defer qs.lock.Unlock()
// A call to Wait while the system is quiescing will be rebuffed by
// returning `tryAnother=true`.
if qs.emptyHandler != nil {
return true, true, false, nil
}
// ========================================================================
// Step 1:
// 1) Start with shuffle sharding, to pick a queue.
// 2) Reject old requests that have been waiting too long
// 3) Reject current request if there is not enough concurrency shares and
// we are at max queue length
// 4) If not rejected, create a request and enqueue
req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue)
// req == nil means that the request was rejected - no remaining
// concurrency shares and at max queue length already
if req == nil {
metrics.AddReject(qs.config.Name, "queue-full")
return true, false, false, func() {}
}
// ========================================================================
// Step 2:
// 1) The next step is to invoke the method that dequeues as much as possible.
// This method runs a loop, as long as there
// are non-empty queues and the number currently executing is less than the
// assured concurrency value. The body of the loop uses the fair queuing
// technique to pick a queue, dequeue the request at the head of that
// queue, increment the count of the number executing, and send true to
// the request's channel.
qs.dequeueWithChannelAsMuchAsPossible()
return false, false, false, func() {}
}()
if shouldReturn {
return tryAnother, execute, afterExecution
}
// ========================================================================
// Step 3:
// After that method finishes its loop and returns, the final step in Wait
// is to `select` (wait) on a message from the enqueud request's channel
// and return appropriately. While waiting this thread does no additional
// work so we decrement the go routine counter
qs.counter.Add(-1)
select {
case execute := <-req.DequeueChannel:
if execute {
// execute the request
return false, true, func() {
qs.finishRequestAndDequeueWithChannelAsMuchAsPossible(req)
}
}
klog.V(5).Infof("request timed out after being enqueued\n")
metrics.AddReject(qs.config.Name, "time-out")
return false, false, func() {}
case <-ctx.Done():
klog.V(5).Infof("request cancelled\n")
func() {
qs.lockAndSyncTime()
defer qs.lock.Unlock()
// TODO(aaron-prindle) add metrics to these two cases
if req.Enqueued {
// remove the request from the queue as it has timed out
for i := range req.Queue.Requests {
if req == req.Queue.Requests[i] {
// remove the request
req.Queue.Requests = append(req.Queue.Requests[:i],
req.Queue.Requests[i+1:]...)
break
}
}
// At this point, if the qs is quiescing,
// has zero requests executing, and has zero requests enqueued
// then a call to the EmptyHandler should be forked.
qs.maybeForkEmptyHandlerLocked()
} else {
// At this point we know that req was in its queue earlier and another
// goroutine has removed req from its queue and called qs.counter.Add(1)
// in anticipation of unblocking this goroutine through the other arm of this
// select. In this case we need to decrement the counter because this goroutine
// was actually unblocked through a different code path.
qs.counter.Add(-1)
}
}()
return false, false, func() {}
}
}
// syncTimeLocked is used to sync the time of the queueSet by looking at the elapsed
// time since the last sync and this value based on the 'virtualtime ratio'
// which scales inversely to the # of active flows
func (qs *queueSet) syncTimeLocked() {
realNow := qs.clock.Now()
timesincelast := realNow.Sub(qs.lastRealTime).Seconds()
qs.lastRealTime = realNow
var virtualTimeRatio float64
activeQueues := 0
reqs := 0
for _, queue := range qs.queues {
reqs += queue.RequestsExecuting
if len(queue.Requests) > 0 || queue.RequestsExecuting > 0 {
activeQueues++
}
}
if activeQueues != 0 {
// TODO(aaron-prindle) document the math.Min usage
virtualTimeRatio = math.Min(float64(reqs), float64(qs.config.ConcurrencyLimit)) / float64(activeQueues)
}
qs.virtualTime += timesincelast * virtualTimeRatio
}
func (qs *queueSet) lockAndSyncTime() {
qs.lock.Lock()
qs.syncTimeLocked()
}
// timeoutOldRequestsAndRejectOrEnqueueLocked encapsulates the logic required
// to validate and enqueue a request for the queueSet/QueueSet:
// 1) Start with shuffle sharding, to pick a queue.
// 2) Reject old requests that have been waiting too long
// 3) Reject current request if there is not enough concurrency shares and
// we are at max queue length
// 4) If not rejected, create a request and enqueue
// returns the enqueud request on a successful enqueue
// returns nil in the case that there is no available concurrency or
// the queuelengthlimit has been reached
func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue uint64) *fq.Request {
// Start with the shuffle sharding, to pick a queue.
queueIdx := qs.chooseQueueIndexLocked(hashValue)
queue := qs.queues[queueIdx]
// The next step is the logic to reject requests that have been waiting too long
qs.removeTimedOutRequestsFromQueueLocked(queue)
// NOTE: currently timeout is only checked for each new request. This means that there can be
// requests that are in the queue longer than the timeout if there are no new requests
// We prefer the simplicity over the promptness, at least for now.
// Create a request and enqueue
req := &fq.Request{
DequeueChannel: make(chan bool, 1),
RealEnqueueTime: qs.clock.Now(),
Queue: queue,
}
if ok := qs.rejectOrEnqueueLocked(req); !ok {
return nil
}
metrics.ObserveQueueLength(qs.config.Name, len(queue.Requests))
return req
}
// removeTimedOutRequestsFromQueueLocked rejects old requests that have been enqueued
// past the requestWaitLimit
func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *fq.Queue) {
timeoutIdx := -1
now := qs.clock.Now()
reqs := queue.Requests
// reqs are sorted oldest -> newest
// can short circuit loop (break) if oldest requests are not timing out
// as newer requests also will not have timed out
// now - requestWaitLimit = waitLimit
waitLimit := now.Add(-qs.config.RequestWaitLimit)
for i, req := range reqs {
if waitLimit.After(req.RealEnqueueTime) {
qs.counter.Add(1)
req.DequeueChannel <- false
close(req.DequeueChannel)
// get index for timed out requests
timeoutIdx = i
} else {
break
}
}
// remove timed out requests from queue
if timeoutIdx != -1 {
// timeoutIdx + 1 to remove the last timeout req
removeIdx := timeoutIdx + 1
// remove all the timeout requests
queue.Requests = reqs[removeIdx:]
// decrement the # of requestsEnqueued
qs.numRequestsEnqueued -= removeIdx
}
}
// getRequestsExecutingLocked gets the # of requests which are "executing":
// this is the# of requests/requests which have been dequeued but have not had
// finished (via the FinishRequest method invoked after service)
func (qs *queueSet) getRequestsExecutingLocked() int {
total := 0
for _, queue := range qs.queues {
total += queue.RequestsExecuting
}
return total
}
// chooseQueueIndexLocked uses shuffle sharding to select a queue index
// using the given hashValue and the shuffle sharding parameters of the queueSet.
func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64) int {
bestQueueIdx := -1
bestQueueLen := int(math.MaxInt32)
// DesiredNum is used here instead of numQueues to omit quiescing queues
qs.dealer.Deal(hashValue, func(queueIdx int) {
thisLen := len(qs.queues[queueIdx].Requests)
if thisLen < bestQueueLen {
bestQueueIdx, bestQueueLen = queueIdx, thisLen
}
})
return bestQueueIdx
}
// updateQueueVirtualStartTime updates the virtual start time for a queue
// this is done when a new request is enqueued. For more info see:
// https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/20190228-priority-and-fairness.md#dispatching
func (qs *queueSet) updateQueueVirtualStartTime(request *fq.Request, queue *fq.Queue) {
// When a request arrives to an empty queue with no requests executing:
// len(queue.Requests) == 1 as enqueue has just happened prior (vs == 0)
if len(queue.Requests) == 1 && queue.RequestsExecuting == 0 {
// the queues virtual start time is set to the virtual time.
queue.VirtualStart = qs.virtualTime
}
}
// enqueues a request into an queueSet
func (qs *queueSet) enqueue(request *fq.Request) {
queue := request.Queue
queue.Enqueue(request)
qs.updateQueueVirtualStartTime(request, queue)
qs.numRequestsEnqueued++
metrics.UpdateFlowControlRequestsInQueue(qs.config.Name, qs.numRequestsEnqueued)
}
// rejectOrEnqueueLocked rejects or enqueues the newly arrived request if
// resource criteria isn't met
func (qs *queueSet) rejectOrEnqueueLocked(request *fq.Request) bool {
queue := request.Queue
curQueueLength := len(queue.Requests)
// rejects the newly arrived request if resource criteria not met
if qs.getRequestsExecutingLocked() >= qs.config.ConcurrencyLimit &&
curQueueLength >= qs.config.QueueLengthLimit {
return false
}
qs.enqueue(request)
return true
}
// selectQueue selects the minimum virtualFinish time from the set of queues
// the starting queue is selected via roundrobin
func (qs *queueSet) selectQueue() *fq.Queue {
minVirtualFinish := math.Inf(1)
var minQueue *fq.Queue
var minIndex int
for range qs.queues {
queue := qs.queues[qs.robinIndex]
if len(queue.Requests) != 0 {
currentVirtualFinish := queue.GetVirtualFinish(0, qs.estimatedServiceTime)
if currentVirtualFinish < minVirtualFinish {
minVirtualFinish = currentVirtualFinish
minQueue = queue
minIndex = qs.robinIndex
}
}
qs.robinIndex = (qs.robinIndex + 1) % len(qs.queues)
}
// we set the round robin indexing to start at the chose queue
// for the next round. This way the non-selected queues
// win in the case that the virtual finish times are the same
qs.robinIndex = minIndex
return minQueue
}
// dequeue dequeues a request from the queueSet
func (qs *queueSet) dequeue() (*fq.Request, bool) {
queue := qs.selectQueue()
if queue == nil {
return nil, false
}
request, ok := queue.Dequeue()
if !ok {
return nil, false
}
// When a request is dequeued for service -> qs.VirtualStart += G
queue.VirtualStart += qs.estimatedServiceTime
request.StartTime = qs.clock.Now()
// request dequeued, service has started
queue.RequestsExecuting++
metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, queue.RequestsExecuting)
qs.numRequestsEnqueued--
return request, ok
}
// dequeueWithChannelAsMuchAsPossible runs a loop, as long as there
// are non-empty queues and the number currently executing is less than the
// assured concurrency value. The body of the loop uses the fair queuing
// technique to pick a queue, dequeue the request at the head of that
// queue, increment the count of the number executing, and send true
// to the request's channel.
func (qs *queueSet) dequeueWithChannelAsMuchAsPossible() {
for qs.numRequestsEnqueued != 0 && qs.getRequestsExecutingLocked() < qs.config.ConcurrencyLimit {
_, ok := qs.dequeueWithChannel()
if !ok {
break
}
}
}
// dequeueWithChannel is a convenience method for dequeueing requests that
// require a message to be sent through the requests channel
// this is a required pattern for the QueueSet the queueSet supports
func (qs *queueSet) dequeueWithChannel() (*fq.Request, bool) {
req, ok := qs.dequeue()
if !ok {
return nil, false
}
qs.counter.Add(1)
req.DequeueChannel <- true
close(req.DequeueChannel)
return req, ok
}
// removeQueueAndUpdateIndexes uses reslicing to remove an index from a slice
// and then updates the 'Index' field of the queues to be correct
func removeQueueAndUpdateIndexes(queues []*fq.Queue, index int) []*fq.Queue {
keptQueues := append(queues[:index], queues[index+1:]...)
for i := index; i < len(keptQueues); i++ {
keptQueues[i].Index--
}
return keptQueues
}
// finishRequestLocked is a callback that should be used when a previously dequeued request
// has completed it's service. This callback updates important state in the
// queueSet
func (qs *queueSet) finishRequestLocked(r *fq.Request) {
S := qs.clock.Since(r.StartTime).Seconds()
// When a request finishes being served, and the actual service time was S,
// the queues virtual start time is decremented by G - S.
r.Queue.VirtualStart -= qs.estimatedServiceTime - S
// request has finished, remove from requests executing
r.Queue.RequestsExecuting--
// Logic to remove quiesced queues
// >= as QueueIdx=25 is out of bounds for DesiredNum=25 [0...24]
if r.Queue.Index >= qs.config.DesiredNumQueues &&
len(r.Queue.Requests) == 0 &&
r.Queue.RequestsExecuting == 0 {
qs.queues = removeQueueAndUpdateIndexes(qs.queues, r.Queue.Index)
// decrement here to maintain the invariant that (qs.robinIndex+1) % numQueues
// is the index of the next queue after the one last dispatched from
if qs.robinIndex >= -r.Queue.Index {
qs.robinIndex--
}
// At this point, if the qs is quiescing,
// has zero requests executing, and has zero requests enqueued
// then a call to the EmptyHandler should be forked.
qs.maybeForkEmptyHandlerLocked()
}
}
func (qs *queueSet) maybeForkEmptyHandlerLocked() {
if qs.emptyHandler != nil && qs.numRequestsEnqueued == 0 &&
qs.getRequestsExecutingLocked() == 0 {
qs.counter.Add(1)
go func(eh fq.EmptyHandler) {
defer runtime.HandleCrash()
defer qs.counter.Add(-1)
eh.HandleEmpty()
}(qs.emptyHandler)
}
}
// finishRequestAndDequeueWithChannelAsMuchAsPossible is a convenience method which calls finishRequest
// for a given request and then dequeues as many requests as possible
// and updates that request's channel signifying it is is dequeued
// this is a callback used for the filter that the queueSet supports
func (qs *queueSet) finishRequestAndDequeueWithChannelAsMuchAsPossible(req *fq.Request) {
qs.lockAndSyncTime()
defer qs.lock.Unlock()
qs.finishRequestLocked(req)
qs.dequeueWithChannelAsMuchAsPossible()
}

View File

@ -0,0 +1,212 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queueset
import (
"context"
"math"
"sync/atomic"
"testing"
"time"
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
test "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock"
)
type uniformScenario []uniformClient
type uniformClient struct {
hash uint64
nThreads int
nCalls int
execDuration time.Duration
thinkDuration time.Duration
}
// exerciseQueueSetUniformScenario. Simple logic, only works if each
// client's offered load is at least as large as its fair share of
// capacity.
func exerciseQueueSetUniformScenario(t *testing.T, qs fq.QueueSet, sc uniformScenario,
totalDuration time.Duration, expectPass bool, expectedAllRequests bool,
clk *clock.FakeEventClock, counter counter.GoRoutineCounter) {
now := time.Now()
t.Logf("%s: Start", clk.Now().Format("2006-01-02 15:04:05.000000000"))
integrators := make([]test.Integrator, len(sc))
var failedCount uint64
for i, uc := range sc {
integrators[i] = test.NewIntegrator(clk)
for j := 0; j < uc.nThreads; j++ {
counter.Add(1)
go func(i, j int, uc uniformClient, igr test.Integrator) {
for k := 0; k < uc.nCalls; k++ {
ClockWait(clk, counter, uc.thinkDuration)
for {
tryAnother, execute, afterExecute := qs.Wait(context.Background(), uc.hash)
t.Logf("%s: %d, %d, %d got q=%v, e=%v", clk.Now().Format("2006-01-02 15:04:05.000000000"), i, j, k, tryAnother, execute)
if tryAnother {
continue
}
if !execute {
atomic.AddUint64(&failedCount, 1)
break
}
igr.Add(1)
ClockWait(clk, counter, uc.execDuration)
afterExecute()
igr.Add(-1)
break
}
}
counter.Add(-1)
}(i, j, uc, integrators[i])
}
}
lim := now.Add(totalDuration)
clk.Run(&lim)
clk.SetTime(lim)
t.Logf("%s: End", clk.Now().Format("2006-01-02 15:04:05.000000000"))
results := make([]test.IntegratorResults, len(sc))
var sumOfAvg float64
for i := range sc {
results[i] = integrators[i].GetResults()
sumOfAvg += results[i].Average
}
idealAverage := sumOfAvg / float64(len(sc))
passes := make([]bool, len(sc))
allPass := true
for i := range sc {
relDiff := (results[i].Average - idealAverage) / idealAverage
passes[i] = math.Abs(relDiff) <= 0.1
allPass = allPass && passes[i]
}
for i := range sc {
if allPass != expectPass {
t.Errorf("Class %d got an Average of %v but the ideal was %v", i, results[i].Average, idealAverage)
} else {
t.Logf("Class %d got an Average of %v and the ideal was %v", i, results[i].Average, idealAverage)
}
}
clk.Run(nil)
if expectedAllRequests && failedCount > 0 {
t.Errorf("Expected all requests to be successful but got %v failed requests", failedCount)
} else if !expectedAllRequests && failedCount == 0 {
t.Errorf("Expected failed requests but all requests succeeded")
}
}
// TestNoRestraint should fail because the dummy QueueSet exercises no control
func TestNoRestraint(t *testing.T) {
now := time.Now()
clk, counter := clock.NewFakeEventClock(now, 0, nil)
nrf := test.NewNoRestraintFactory()
config := fq.QueueSetConfig{}
nr, err := nrf.NewQueueSet(config)
if err != nil {
t.Fatalf("QueueSet creation failed with %v", err)
}
exerciseQueueSetUniformScenario(t, nr, []uniformClient{
{1001001001, 5, 10, time.Second, time.Second},
{2002002002, 2, 10, time.Second, time.Second / 2},
}, time.Second*10, false, true, clk, counter)
}
func TestUniformFlows(t *testing.T) {
now := time.Now()
clk, counter := clock.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
config := fq.QueueSetConfig{
Name: "TestUniformFlows",
ConcurrencyLimit: 100,
DesiredNumQueues: 128,
QueueLengthLimit: 128,
HandSize: 1,
RequestWaitLimit: 10 * time.Minute,
}
qs, err := qsf.NewQueueSet(config)
if err != nil {
t.Fatalf("QueueSet creation failed with %v", err)
}
exerciseQueueSetUniformScenario(t, qs, []uniformClient{
{1001001001, 5, 10, time.Second, time.Second},
{2002002002, 5, 10, time.Second, time.Second},
}, time.Second*10, true, true, clk, counter)
}
func TestDifferentFlows(t *testing.T) {
now := time.Now()
clk, counter := clock.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
config := fq.QueueSetConfig{
Name: "TestDifferentFlows",
ConcurrencyLimit: 1,
DesiredNumQueues: 128,
QueueLengthLimit: 128,
HandSize: 1,
RequestWaitLimit: 10 * time.Minute,
}
qs, err := qsf.NewQueueSet(config)
if err != nil {
t.Fatalf("QueueSet creation failed with %v", err)
}
exerciseQueueSetUniformScenario(t, qs, []uniformClient{
{1001001001, 5, 10, time.Second, time.Second},
{2002002002, 2, 5, time.Second, time.Second / 2},
}, time.Second*10, true, true, clk, counter)
}
func TestTimeout(t *testing.T) {
now := time.Now()
clk, counter := clock.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
config := fq.QueueSetConfig{
Name: "TestTimeout",
ConcurrencyLimit: 1,
DesiredNumQueues: 128,
QueueLengthLimit: 128,
HandSize: 1,
RequestWaitLimit: 0,
}
qs, err := qsf.NewQueueSet(config)
if err != nil {
t.Fatalf("QueueSet creation failed with %v", err)
}
exerciseQueueSetUniformScenario(t, qs, []uniformClient{
{1001001001, 5, 100, time.Second, time.Second},
}, time.Second*10, true, false, clk, counter)
}
func ClockWait(clk *clock.FakeEventClock, counter counter.GoRoutineCounter, duration time.Duration) {
dunch := make(chan struct{})
clk.EventAfterDuration(func(time.Time) {
counter.Add(1)
close(dunch)
}, duration)
counter.Add(-1)
select {
case <-dunch:
}
}

View File

@ -0,0 +1,222 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package clock
import (
"container/heap"
"math/rand"
"sync"
"time"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
)
// EventFunc does some work that needs to be done at or after the
// given time. After this function returns, associated work may continue
// on other goroutines only if they are counted by the GoRoutineCounter
// of the FakeEventClock handling this EventFunc.
type EventFunc func(time.Time)
// EventClock fires event on time
type EventClock interface {
clock.PassiveClock
EventAfterDuration(f EventFunc, d time.Duration)
EventAfterTime(f EventFunc, t time.Time)
}
// RealEventClock fires event on real world time
type RealEventClock struct {
clock.RealClock
}
// EventAfterDuration schedules an EventFunc
func (RealEventClock) EventAfterDuration(f EventFunc, d time.Duration) {
ch := time.After(d)
go func() {
select {
case t := <-ch:
f(t)
}
}()
}
// EventAfterTime schedules an EventFunc
func (r RealEventClock) EventAfterTime(f EventFunc, t time.Time) {
now := time.Now()
d := t.Sub(now)
if d <= 0 {
go f(now)
} else {
r.EventAfterDuration(f, d)
}
}
// waitGroupCounter is a wait group used for a GoRoutine Counter. This private
// type is used to disallow direct waitGroup access
type waitGroupCounter struct{ sync.WaitGroup }
// compile time assertion that waitGroupCounter meets requirements
// of GoRoutineCounter
var _ counter.GoRoutineCounter = (*waitGroupCounter)(nil)
// FakeEventClock is one whose time does not pass implicitly but
// rather is explicitly set by invocations of its SetTime method
type FakeEventClock struct {
clock.FakePassiveClock
// waiters is a heap of waiting work, sorted by time
waiters eventWaiterHeap
waitersLock sync.RWMutex
// clientWG may be nil and if not supplies constraints on time
// passing in Run. The Run method will not pick a new time until
// this is nil or its counter is zero.
clientWG *waitGroupCounter
// fuzz is the amount of noise to add to scheduling. An event
// requested to run at time T will run at some time chosen
// uniformly at random from the interval [T, T+fuzz]; the upper
// bound is exclusive iff fuzz is non-zero.
fuzz time.Duration
// rand is the random number generator to use in fuzzing
rand *rand.Rand
}
type eventWaiterHeap []eventWaiter
var _ heap.Interface = (*eventWaiterHeap)(nil)
type eventWaiter struct {
targetTime time.Time
f EventFunc
}
// NewFakeEventClock constructor. The given `r *rand.Rand` must
// henceforth not be used for any other purpose. If `r` is nil then a
// fresh one will be constructed, seeded with the current real time.
// The clientWG can be `nil` and if not is used to let Run know about
// additional work that has to complete before time can advance.
func NewFakeEventClock(t time.Time, fuzz time.Duration, r *rand.Rand) (*FakeEventClock, counter.GoRoutineCounter) {
grc := &waitGroupCounter{}
if r == nil {
r = rand.New(rand.NewSource(time.Now().UnixNano()))
r.Uint64()
r.Uint64()
r.Uint64()
}
return &FakeEventClock{
FakePassiveClock: *clock.NewFakePassiveClock(t),
clientWG: grc,
fuzz: fuzz,
rand: r,
}, grc
}
// GetNextTime returns the next time at which there is work scheduled,
// and a bool indicating whether there is any such time
func (fec *FakeEventClock) GetNextTime() (time.Time, bool) {
fec.waitersLock.RLock()
defer fec.waitersLock.RUnlock()
if len(fec.waiters) > 0 {
return fec.waiters[0].targetTime, true
}
return time.Time{}, false
}
// Run runs all the events scheduled, and all the events they
// schedule, and so on, until there are none scheduled or the limit is not
// nil and the next time would exceed the limit. The clientWG given in
// the constructor gates each advance of time.
func (fec *FakeEventClock) Run(limit *time.Time) {
for {
fec.clientWG.Wait()
t, ok := fec.GetNextTime()
if !ok || limit != nil && t.After(*limit) {
break
}
fec.SetTime(t)
}
}
// SetTime sets the time and runs to completion all events that should
// be started by the given time --- including any further events they
// schedule
func (fec *FakeEventClock) SetTime(t time.Time) {
fec.FakePassiveClock.SetTime(t)
for {
foundSome := false
func() {
fec.waitersLock.Lock()
defer fec.waitersLock.Unlock()
// This loop is because events run at a given time may schedule more
// events to run at that or an earlier time.
// Events should not advance the clock. But just in case they do...
now := fec.Now()
var wg sync.WaitGroup
for len(fec.waiters) > 0 && !now.Before(fec.waiters[0].targetTime) {
ew := heap.Pop(&fec.waiters).(eventWaiter)
wg.Add(1)
go func(f EventFunc) { f(now); wg.Done() }(ew.f)
foundSome = true
}
wg.Wait()
}()
if !foundSome {
break
}
}
}
// EventAfterDuration schedules the given function to be invoked once
// the given duration has passed.
func (fec *FakeEventClock) EventAfterDuration(f EventFunc, d time.Duration) {
fec.waitersLock.Lock()
defer fec.waitersLock.Unlock()
now := fec.Now()
fd := time.Duration(float32(fec.fuzz) * fec.rand.Float32())
heap.Push(&fec.waiters, eventWaiter{targetTime: now.Add(d + fd), f: f})
}
// EventAfterTime schedules the given function to be invoked once
// the given time has arrived.
func (fec *FakeEventClock) EventAfterTime(f EventFunc, t time.Time) {
fec.waitersLock.Lock()
defer fec.waitersLock.Unlock()
fd := time.Duration(float32(fec.fuzz) * fec.rand.Float32())
heap.Push(&fec.waiters, eventWaiter{targetTime: t.Add(fd), f: f})
}
func (ewh eventWaiterHeap) Len() int { return len(ewh) }
func (ewh eventWaiterHeap) Less(i, j int) bool { return ewh[i].targetTime.Before(ewh[j].targetTime) }
func (ewh eventWaiterHeap) Swap(i, j int) { ewh[i], ewh[j] = ewh[j], ewh[i] }
func (ewh *eventWaiterHeap) Push(x interface{}) {
*ewh = append(*ewh, x.(eventWaiter))
}
func (ewh *eventWaiterHeap) Pop() interface{} {
old := *ewh
n := len(old)
x := old[n-1]
*ewh = old[:n-1]
return x
}

View File

@ -0,0 +1,183 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package clock
import (
"math/rand"
"sync/atomic"
"testing"
"time"
"k8s.io/apimachinery/pkg/util/clock"
)
type TestableEventClock interface {
EventClock
SetTime(time.Time)
Run(*time.Time)
}
// settablePassiveClock allows setting current time of a passive clock
type settablePassiveClock interface {
clock.PassiveClock
SetTime(time.Time)
}
func exerciseTestableEventClock(t *testing.T, ec TestableEventClock, fuzz time.Duration) {
exercisePassiveClock(t, ec)
var numDone int32
now := ec.Now()
strictable := true
const batchSize = 100
times := make(chan time.Time, batchSize+1)
try := func(abs, strict bool, d time.Duration) {
f := func(u time.Time) {
realD := ec.Since(now)
atomic.AddInt32(&numDone, 1)
times <- u
if realD < d || strict && strictable && realD > d+fuzz {
t.Errorf("Asked for %v, got %v", d, realD)
}
}
if abs {
ec.EventAfterTime(f, now.Add(d))
} else {
ec.EventAfterDuration(f, d)
}
}
try(true, true, time.Minute)
for i := 0; i < batchSize; i++ {
d := time.Duration(rand.Intn(30)-3) * time.Second
try(i%2 == 0, d >= 0, d)
}
ec.Run(nil)
if numDone != batchSize+1 {
t.Errorf("Got only %v events", numDone)
}
lastTime := now.Add(-3 * time.Second)
for i := 0; i <= batchSize; i++ {
nextTime := <-times
if nextTime.Before(lastTime) {
t.Errorf("Got %s after %s", nextTime, lastTime)
}
}
endTime := ec.Now()
dx := endTime.Sub(now)
if dx > time.Minute+fuzz {
t.Errorf("Run started at %#+v, ended at %#+v, dx=%d", now, endTime, dx)
}
now = endTime
var shouldRun int32
strictable = false
for i := 0; i < batchSize; i++ {
d := time.Duration(rand.Intn(30)-3) * time.Second
try(i%2 == 0, d >= 0, d)
if d <= 12*time.Second {
shouldRun++
}
}
ec.SetTime(now.Add(13*time.Second - 1))
if numDone != batchSize+1+shouldRun {
t.Errorf("Expected %v, but %v ran", shouldRun, numDone-batchSize-1)
}
lastTime = now.Add(-3 * time.Second)
for i := int32(0); i < shouldRun; i++ {
nextTime := <-times
if nextTime.Before(lastTime) {
t.Errorf("Got %s after %s", nextTime, lastTime)
}
lastTime = nextTime
}
}
func exercisePassiveClock(t *testing.T, pc settablePassiveClock) {
t1 := time.Now()
t2 := t1.Add(time.Hour)
pc.SetTime(t1)
tx := pc.Now()
if tx != t1 {
t.Errorf("SetTime(%#+v); Now() => %#+v", t1, tx)
}
dx := pc.Since(t1)
if dx != 0 {
t.Errorf("Since() => %v", dx)
}
pc.SetTime(t2)
dx = pc.Since(t1)
if dx != time.Hour {
t.Errorf("Since() => %v", dx)
}
tx = pc.Now()
if tx != t2 {
t.Errorf("Now() => %#+v", tx)
}
}
func TestFakeEventClock(t *testing.T) {
startTime := time.Now()
fec, _ := NewFakeEventClock(startTime, 0, nil)
exerciseTestableEventClock(t, fec, 0)
fec, _ = NewFakeEventClock(startTime, time.Second, nil)
exerciseTestableEventClock(t, fec, time.Second)
}
func exerciseEventClock(t *testing.T, ec EventClock, relax func(time.Duration)) {
var numDone int32
now := ec.Now()
const batchSize = 100
times := make(chan time.Time, batchSize+1)
try := func(abs bool, d time.Duration) {
f := func(u time.Time) {
realD := ec.Since(now)
atomic.AddInt32(&numDone, 1)
times <- u
if realD < d {
t.Errorf("Asked for %v, got %v", d, realD)
}
}
if abs {
ec.EventAfterTime(f, now.Add(d))
} else {
ec.EventAfterDuration(f, d)
}
}
try(true, time.Millisecond*3300)
for i := 0; i < batchSize; i++ {
d := time.Duration(rand.Intn(30)-3) * time.Millisecond * 100
try(i%2 == 0, d)
}
relax(time.Second * 4)
if atomic.LoadInt32(&numDone) != batchSize+1 {
t.Errorf("Got only %v events", numDone)
}
lastTime := now
for i := 0; i <= batchSize; i++ {
nextTime := <-times
if nextTime.Before(now) {
continue
}
dt := nextTime.Sub(lastTime) / (50 * time.Millisecond)
if dt < 0 {
t.Errorf("Got %s after %s", nextTime, lastTime)
}
lastTime = nextTime
}
}
func TestRealEventClock(t *testing.T) {
exerciseEventClock(t, RealEventClock{}, func(d time.Duration) { time.Sleep(d) })
}

View File

@ -0,0 +1,103 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
import (
"math"
"sync"
"time"
"k8s.io/apimachinery/pkg/util/clock"
)
// Integrator computes the integral of some variable X over time as
// read from a particular clock. The integral starts when the
// Integrator is created, and ends at the latest operation on the
// Integrator.
type Integrator interface {
Set(float64) // set the value of X
Add(float64) // add the given quantity to X
GetResults() IntegratorResults
}
// IntegratorResults holds statistical abstracts of the integration
type IntegratorResults struct {
Duration float64 //seconds
Average float64
Deviation float64 //sqrt(avg((value-avg)^2))
}
type integrator struct {
sync.Mutex
clk clock.PassiveClock
lastTime time.Time
x float64
integrals [3]float64 // integral of x^0, x^1, and x^2
}
// NewIntegrator makes one that uses the given clock
func NewIntegrator(clk clock.PassiveClock) Integrator {
return &integrator{
clk: clk,
lastTime: clk.Now(),
}
}
func (igr *integrator) Set(x float64) {
igr.Lock()
igr.updateLocked()
igr.x = x
igr.Unlock()
}
func (igr *integrator) Add(deltaX float64) {
igr.Lock()
igr.updateLocked()
igr.x += deltaX
igr.Unlock()
}
func (igr *integrator) updateLocked() {
now := igr.clk.Now()
dt := now.Sub(igr.lastTime).Seconds()
igr.lastTime = now
igr.integrals[0] += dt
igr.integrals[1] += dt * igr.x
igr.integrals[2] += dt * igr.x * igr.x
}
func (igr *integrator) GetResults() (results IntegratorResults) {
igr.Lock()
defer func() { igr.Unlock() }()
igr.updateLocked()
results.Duration = igr.integrals[0]
if results.Duration <= 0 {
results.Average = math.NaN()
results.Deviation = math.NaN()
return
}
results.Average = igr.integrals[1] / igr.integrals[0]
// Deviation is sqrt( Integral( (x - xbar)^2 dt) / Duration )
// = sqrt( Integral( x^2 + xbar^2 -2*x*xbar dt ) / Duration )
// = sqrt( ( Integral( x^2 dt ) + Duration * xbar^2 - 2*xbar*Integral(x dt) ) / Duration)
// = sqrt( Integral(x^2 dt)/Duration - xbar^2 )
variance := igr.integrals[2]/igr.integrals[0] - results.Average*results.Average
if variance > 0 {
results.Deviation = math.Sqrt(variance)
}
return
}

View File

@ -0,0 +1,49 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
import (
"context"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
)
// NewNoRestraintFactory makes a QueueSetFactory that produces
// QueueSets that exert no restraint --- every request is dispatched
// for execution as soon as it arrives.
func NewNoRestraintFactory() fq.QueueSetFactory {
return noRestraintFactory{}
}
type noRestraintFactory struct{}
func (noRestraintFactory) NewQueueSet(config fq.QueueSetConfig) (fq.QueueSet, error) {
return noRestraint{}, nil
}
type noRestraint struct{}
func (noRestraint) SetConfiguration(config fq.QueueSetConfig) error {
return nil
}
func (noRestraint) Quiesce(fq.EmptyHandler) {
}
func (noRestraint) Wait(ctx context.Context, hashValue uint64) (quiescent, execute bool, afterExecution func()) {
return false, true, func() {}
}

View File

@ -0,0 +1,73 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fairqueuing
import (
"time"
)
// Request is a temporary container for "requests" with additional tracking fields
// required for the functionality FQScheduler
type Request struct {
//TODO(aaron-prindle) seq is only used for testing, this was abstracted
// via an interface before, keeping this for now
QueueIdx int
Queue *Queue
StartTime time.Time
DequeueChannel chan bool
RealEnqueueTime time.Time
Enqueued bool
}
// Queue is an array of requests with additional metadata required for
// the FQScheduler
type Queue struct {
Requests []*Request
VirtualStart float64
RequestsExecuting int
Index int
}
// Enqueue enqueues a request into the queue
func (q *Queue) Enqueue(request *Request) {
request.Enqueued = true
q.Requests = append(q.Requests, request)
}
// Dequeue dequeues a request from the queue
func (q *Queue) Dequeue() (*Request, bool) {
if len(q.Requests) == 0 {
return nil, false
}
request := q.Requests[0]
q.Requests = q.Requests[1:]
request.Enqueued = false
return request, true
}
// GetVirtualFinish returns the expected virtual finish time of the request at
// index J in the queue with estimated finish time G
func (q *Queue) GetVirtualFinish(J int, G float64) float64 {
// The virtual finish time of request number J in the queue
// (counting from J=1 for the head) is J * G + (virtual start time).
// counting from J=1 for the head (eg: queue.Requests[0] -> J=1) - J+1
jg := float64(J+1) * float64(G)
return jg + q.VirtualStart
}

View File

@ -0,0 +1,152 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"time"
"github.com/prometheus/client_golang/prometheus"
)
const (
namespace = "apiserver"
subsystem = "flowcontrol"
)
const (
priorityLevel = "priorityLevel"
flowSchema = "flowSchema"
)
var (
queueLengthBuckets = []float64{0, 10, 25, 50, 100, 250, 500, 1000}
requestDurationSecondsBuckets = []float64{0, 0.005, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 30}
)
func init() {
prometheus.MustRegister(apiserverRejectedRequests)
prometheus.MustRegister(apiserverCurrentInqueueRequests)
prometheus.MustRegister(apiserverRequestQueueLength)
prometheus.MustRegister(apiserverRequestConcurrencyLimit)
prometheus.MustRegister(apiserverCurrentExecutingRequests)
prometheus.MustRegister(apiserverRequestWaitingSeconds)
prometheus.MustRegister(apiserverRequestExecutionSeconds)
}
var (
apiserverRejectedRequests = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "rejectedRequests",
Help: "Number of rejected requests by api priority and fairness system",
},
[]string{priorityLevel, "reason"},
)
apiserverCurrentInqueueRequests = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "currentInqueueRequests",
Help: "Number of requests currently pending in the queue by the api priority and fairness system",
},
[]string{priorityLevel},
)
apiserverRequestQueueLength = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "requestQueueLength",
Help: "Length of queue in the api priority and fairness system",
Buckets: queueLengthBuckets,
},
[]string{priorityLevel},
)
apiserverRequestConcurrencyLimit = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "requestConcurrencyLimit",
Help: "Shared concurrency limit in the api priority and fairness system",
},
[]string{priorityLevel},
)
apiserverCurrentExecutingRequests = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "currentExecutingRequests",
Help: "Number of requests currently executing in the api priority and fairness system",
},
[]string{priorityLevel},
)
apiserverRequestWaitingSeconds = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "request_wait_durationSeconds",
Help: "Length of time a request spent waiting in its queue",
Buckets: requestDurationSecondsBuckets,
},
[]string{priorityLevel, flowSchema, "execute"},
)
apiserverRequestExecutionSeconds = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "requestExecutionSeconds",
Help: "Time of request executing in the api priority and fairness system",
Buckets: requestDurationSecondsBuckets,
},
[]string{priorityLevel, flowSchema},
)
)
// UpdateFlowControlRequestsInQueue updates the value for the # of requests in the specified queues in flow control
func UpdateFlowControlRequestsInQueue(priorityLevel string, inqueue int) {
apiserverCurrentInqueueRequests.WithLabelValues(priorityLevel).Set(float64(inqueue))
}
// UpdateFlowControlRequestsExecuting updates the value for the # of requests executing in flow control
func UpdateFlowControlRequestsExecuting(priorityLevel string, executing int) {
apiserverCurrentExecutingRequests.WithLabelValues(priorityLevel).Set(float64(executing))
}
// UpdateSharedConcurrencyLimit updates the value for the concurrency limit in flow control
func UpdateSharedConcurrencyLimit(priorityLevel string, limit int) {
apiserverRequestConcurrencyLimit.WithLabelValues(priorityLevel).Set(float64(limit))
}
// AddReject increments the # of rejected requests for flow control
func AddReject(priorityLevel string, reason string) {
apiserverRejectedRequests.WithLabelValues(priorityLevel, reason).Add(1)
}
// ObserveQueueLength observes the queue length for flow control
func ObserveQueueLength(priorityLevel string, length int) {
apiserverRequestQueueLength.WithLabelValues(priorityLevel).Observe(float64(length))
}
// ObserveWaitingDuration observes the queue length for flow control
func ObserveWaitingDuration(priorityLevel, flowSchema, execute string, waitTime time.Duration) {
apiserverRequestWaitingSeconds.WithLabelValues(priorityLevel, flowSchema, execute).Observe(waitTime.Seconds())
}
// ObserveExecutionDuration observes the execution duration for flow control
func ObserveExecutionDuration(priorityLevel, flowSchema string, executionTime time.Duration) {
apiserverRequestExecutionSeconds.WithLabelValues(priorityLevel, flowSchema).Observe(executionTime.Seconds())
}