diff --git a/go.mod b/go.mod index 2819639a4..c6a38dd40 100644 --- a/go.mod +++ b/go.mod @@ -26,8 +26,9 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/hashicorp/golang-lru v0.5.1 github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 - github.com/pkg/errors v0.8.1 // indirect + github.com/pkg/errors v0.8.1 github.com/pquerna/cachecontrol v0.0.0-20171018203845-0dec1b30a021 // indirect + github.com/prometheus/client_golang v1.0.0 github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 github.com/sirupsen/logrus v1.4.2 // indirect github.com/spf13/pflag v1.0.5 @@ -42,10 +43,10 @@ require ( gopkg.in/square/go-jose.v2 v2.2.2 gopkg.in/yaml.v2 v2.2.4 gotest.tools v2.2.0+incompatible // indirect - k8s.io/api v0.0.0-20191114100036-40f4bbc2b486 - k8s.io/apimachinery v0.0.0-20191114095528-3db02fd2eea7 - k8s.io/client-go v0.0.0-20191114100703-1f4f5fa64a6c - k8s.io/component-base v0.0.0-20191114102135-42a5d5b2565c + k8s.io/api v0.0.0 + k8s.io/apimachinery v0.0.0 + k8s.io/client-go v0.0.0 + k8s.io/component-base v0.0.0 k8s.io/klog v1.0.0 k8s.io/kube-openapi v0.0.0-20191107075043-30be4d16710a k8s.io/utils v0.0.0-20191030222137-2b95a09bc58d @@ -56,8 +57,9 @@ require ( replace ( golang.org/x/sys => golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a golang.org/x/tools => golang.org/x/tools v0.0.0-20190821162956-65e3620a7ae7 - k8s.io/api => k8s.io/api v0.0.0-20191114100036-40f4bbc2b486 - k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20191114095528-3db02fd2eea7 - k8s.io/client-go => k8s.io/client-go v0.0.0-20191114100703-1f4f5fa64a6c - k8s.io/component-base => k8s.io/component-base v0.0.0-20191114102135-42a5d5b2565c + k8s.io/api => ../api + k8s.io/apimachinery => ../apimachinery + k8s.io/apiserver => ../apiserver + k8s.io/client-go => ../client-go + k8s.io/component-base => ../component-base ) diff --git a/go.sum b/go.sum index 41c2584ef..8e2af0789 100644 --- a/go.sum +++ b/go.sum @@ -235,6 +235,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/testify v0.0.0-20151208002404-e3a8ff8ce365/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= @@ -346,10 +347,6 @@ gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81 honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= -k8s.io/api v0.0.0-20191114100036-40f4bbc2b486/go.mod h1:IM5ceavki8HjRhUlaRYP3oGw0J/hbXKiKiSqS5AR728= -k8s.io/apimachinery v0.0.0-20191114095528-3db02fd2eea7/go.mod h1:+6CX7hP4aLfX2sb91JYDMIp0VqDSog2kZu0BHe+lP+s= -k8s.io/client-go v0.0.0-20191114100703-1f4f5fa64a6c/go.mod h1:SI++Xl/YwtfdRxOuhN04ry6Hl5PyasXDGmBBZnzofBo= -k8s.io/component-base v0.0.0-20191114102135-42a5d5b2565c/go.mod h1:rwIfg3coOPWGYSmJnTp7yw1QVOB/ncA32pwgawNSR2Q= k8s.io/gengo v0.0.0-20190128074634-0689ccc1d7d6/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0= k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= diff --git a/pkg/util/flowcontrol/counter/interface.go b/pkg/util/flowcontrol/counter/interface.go new file mode 100644 index 000000000..0418e1217 --- /dev/null +++ b/pkg/util/flowcontrol/counter/interface.go @@ -0,0 +1,33 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package counter + +// GoRoutineCounter keeps track of the number of active goroutines +// working on/for something. This is a utility that makes such code more +// testable. The code uses this utility to report the number of active +// goroutines to the test code, so that the test code can advance a fake +// clock when and only when the code being tested has finished all +// the work that is ready to do at the present time. +type GoRoutineCounter interface { + // Add adds the given delta to the count of active goroutines. + // Call Add(1) before forking a goroutine, Add(-1) at the end of that goroutine. + // Call Add(-1) just before waiting on something from another goroutine (e.g., + // just before a `select`). + // Call Add(1) just before doing something that unblocks a goroutine that is + // waiting on that something. + Add(delta int) +} diff --git a/pkg/util/flowcontrol/fairqueuing/interface.go b/pkg/util/flowcontrol/fairqueuing/interface.go new file mode 100644 index 000000000..2de7f92f4 --- /dev/null +++ b/pkg/util/flowcontrol/fairqueuing/interface.go @@ -0,0 +1,88 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fairqueuing + +import ( + "context" + "time" +) + +// QueueSetFactory is used to create QueueSet objects. +type QueueSetFactory interface { + NewQueueSet(config QueueSetConfig) (QueueSet, error) +} + +// QueueSet is the abstraction for the queuing and dispatching +// functionality of one non-exempt priority level. It covers the +// functionality described in the "Assignment to a Queue", "Queuing", +// and "Dispatching" sections of +// https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/20190228-priority-and-fairness.md +// . Some day we may have connections between priority levels, but +// today is not that day. +type QueueSet interface { + // SetConfiguration updates the configuration + SetConfiguration(QueueSetConfig) error + + // Quiesce controls whether the QueueSet is operating normally or is quiescing. + // A quiescing QueueSet drains as normal but does not admit any + // new requests. Passing a non-nil handler means the system should + // be quiescing, a nil handler means the system should operate + // normally. A call to Wait while the system is quiescing + // will be rebuffed by returning tryAnother=true. If all the + // queues have no requests waiting nor executing while the system + // is quiescing then the handler will eventually be called with no + // locks held (even if the system becomes non-quiescing between the + // triggering state and the required call). + Quiesce(EmptyHandler) + + // Wait uses the given hashValue as the source of entropy + // as it shuffle-shards a request into a queue and waits for + // a decision on what to do with that request. If tryAnother==true + // at return then the QueueSet has become undesirable and the client + // should try to find a different QueueSet to use; execute and + // afterExecution are irrelevant in this case. Otherwise, if execute + // then the client should start executing the request and, once the + // request finishes execution or is canceled, call afterExecution(). + // Otherwise the client should not execute the + // request and afterExecution is irrelevant. + Wait(ctx context.Context, hashValue uint64) (tryAnother, execute bool, afterExecution func()) +} + +// QueueSetConfig defines the configuration of a QueueSet. +type QueueSetConfig struct { + // Name is used to identify a queue set, allowing for descriptive information about its intended use + Name string + // ConcurrencyLimit is the maximum number of requests of this QueueSet that may be executing at a time + ConcurrencyLimit int + // DesiredNumQueues is the number of queues that the API says should exist now + DesiredNumQueues int + // QueueLengthLimit is the maximum number of requests that may be waiting in a given queue at a time + QueueLengthLimit int + // HandSize is a parameter of shuffle sharding. Upon arrival of a request, a queue is chosen by randomly + // dealing a "hand" of this many queues and then picking one of minimum length. + HandSize int + // RequestWaitLimit is the maximum amount of time that a request may wait in a queue. + // If, by the end of that time, the request has not been dispatched then it is rejected. + RequestWaitLimit time.Duration +} + +// EmptyHandler is used to notify the callee when all the queues +// of a QueueSet have been drained. +type EmptyHandler interface { + // HandleEmpty is called to deliver the notification + HandleEmpty() +} diff --git a/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go new file mode 100644 index 000000000..b3d78ef6b --- /dev/null +++ b/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -0,0 +1,569 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queueset + +import ( + "context" + "math" + "sync" + "time" + + "k8s.io/apimachinery/pkg/util/runtime" + + "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apiserver/pkg/util/flowcontrol/counter" + fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" + "k8s.io/apiserver/pkg/util/flowcontrol/metrics" + "k8s.io/apiserver/pkg/util/shufflesharding" + "k8s.io/klog" +) + +// queueSetFactory implements the QueueSetFactory interface +// queueSetFactory makes QueueSet objects. +type queueSetFactory struct { + counter counter.GoRoutineCounter + clock clock.PassiveClock +} + +// NewQueueSetFactory creates a new QueueSetFactory object +func NewQueueSetFactory(c clock.PassiveClock, counter counter.GoRoutineCounter) fq.QueueSetFactory { + return &queueSetFactory{ + counter: counter, + clock: c, + } +} + +// NewQueueSet creates a new QueueSet object +// There is a new QueueSet created for each priority level. +func (qsf queueSetFactory) NewQueueSet(config fq.QueueSetConfig) (fq.QueueSet, error) { + return newQueueSet(config, qsf.clock, qsf.counter) +} + +// queueSet is a fair queuing implementation designed with three major differences: +// 1) dispatches requests to be served rather than requests to be transmitted +// 2) serves multiple requests at once +// 3) a request's service time is not known until it finishes +// implementation of: +// https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/20190228-priority-and-fairness.md +type queueSet struct { + lock sync.Mutex + config fq.QueueSetConfig + counter counter.GoRoutineCounter + clock clock.PassiveClock + queues []*fq.Queue + virtualTime float64 + estimatedServiceTime float64 + lastRealTime time.Time + robinIndex int + // numRequestsEnqueued is the number of requests currently enqueued + // (eg: incremeneted on Enqueue, decremented on Dequue) + numRequestsEnqueued int + emptyHandler fq.EmptyHandler + dealer *shufflesharding.Dealer +} + +// initQueues is a helper method for initializing an array of n queues +func initQueues(n, baseIndex int) []*fq.Queue { + fqqueues := make([]*fq.Queue, n) + for i := 0; i < n; i++ { + fqqueues[i] = &fq.Queue{Index: baseIndex + i, Requests: make([]*fq.Request, 0)} + } + return fqqueues +} + +// newQueueSet creates a new queueSet from passed in parameters +func newQueueSet(config fq.QueueSetConfig, c clock.PassiveClock, counter counter.GoRoutineCounter) (*queueSet, error) { + dealer, err := shufflesharding.NewDealer(config.DesiredNumQueues, config.HandSize) + if err != nil { + return nil, errors.Wrap(err, "shuffle sharding dealer creation failed") + } + + fq := &queueSet{ + config: config, + counter: counter, + queues: initQueues(config.DesiredNumQueues, 0), + clock: c, + virtualTime: 0, + lastRealTime: c.Now(), + dealer: dealer, + } + return fq, nil +} + +// SetConfiguration is used to set the configuration for a queueSet +// update handling for when fields are updated is handled here as well - +// eg: if DesiredNum is increased, SetConfiguration reconciles by +// adding more queues. +func (qs *queueSet) SetConfiguration(config fq.QueueSetConfig) error { + qs.lockAndSyncTime() + defer qs.lock.Unlock() + + dealer, err := shufflesharding.NewDealer(config.DesiredNumQueues, config.HandSize) + if err != nil { + return errors.Wrap(err, "shuffle sharding dealer creation failed") + } + + // Adding queues is the only thing that requires immediate action + // Removing queues is handled by omitting indexes >DesiredNum from + // chooseQueueIndexLocked + numQueues := len(qs.queues) + if config.DesiredNumQueues > numQueues { + qs.queues = append(qs.queues, + initQueues(config.DesiredNumQueues-numQueues, len(qs.queues))...) + } + + qs.config = config + qs.dealer = dealer + + qs.dequeueWithChannelAsMuchAsPossible() + return nil +} + +// Quiesce controls whether the QueueSet is operating normally or is quiescing. +// A quiescing QueueSet drains as normal but does not admit any +// new requests. Passing a non-nil handler means the system should +// be quiescing, a nil handler means the system should operate +// normally. A call to Wait while the system is quiescing +// will be rebuffed by returning tryAnother=true. If all the +// queues have no requests waiting nor executing while the system +// is quiescing then the handler will eventually be called with no +// locks held (even if the system becomes non-quiescing between the +// triggering state and the required call). +func (qs *queueSet) Quiesce(eh fq.EmptyHandler) { + qs.lock.Lock() + defer qs.lock.Unlock() + if eh == nil { + qs.emptyHandler = eh + return + } + // Here we check whether there are any requests queued or executing and + // if not then fork an invocation of the EmptyHandler. + qs.maybeForkEmptyHandlerLocked() + + qs.emptyHandler = eh +} + +// Wait uses the given hashValue as the source of entropy +// as it shuffle-shards a request into a queue and waits for +// a decision on what to do with that request. If tryAnother==true +// at return then the QueueSet has become undesirable and the client +// should try to find a different QueueSet to use; execute and +// afterExecution are irrelevant in this case. Otherwise, if execute +// then the client should start executing the request and, once the +// request finishes execution or is canceled, call afterExecution(). +// Otherwise the client should not execute the +// request and afterExecution is irrelevant. +func (qs *queueSet) Wait(ctx context.Context, hashValue uint64) (tryAnother, execute bool, afterExecution func()) { + var req *fq.Request + shouldReturn, tryAnother, execute, afterExecution := func() ( + shouldReturn, tryAnother, execute bool, afterExecution func()) { + + qs.lockAndSyncTime() + defer qs.lock.Unlock() + // A call to Wait while the system is quiescing will be rebuffed by + // returning `tryAnother=true`. + if qs.emptyHandler != nil { + return true, true, false, nil + } + + // ======================================================================== + // Step 1: + // 1) Start with shuffle sharding, to pick a queue. + // 2) Reject old requests that have been waiting too long + // 3) Reject current request if there is not enough concurrency shares and + // we are at max queue length + // 4) If not rejected, create a request and enqueue + req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue) + // req == nil means that the request was rejected - no remaining + // concurrency shares and at max queue length already + if req == nil { + metrics.AddReject(qs.config.Name, "queue-full") + return true, false, false, func() {} + } + + // ======================================================================== + // Step 2: + // 1) The next step is to invoke the method that dequeues as much as possible. + + // This method runs a loop, as long as there + // are non-empty queues and the number currently executing is less than the + // assured concurrency value. The body of the loop uses the fair queuing + // technique to pick a queue, dequeue the request at the head of that + // queue, increment the count of the number executing, and send true to + // the request's channel. + qs.dequeueWithChannelAsMuchAsPossible() + return false, false, false, func() {} + }() + if shouldReturn { + return tryAnother, execute, afterExecution + } + + // ======================================================================== + // Step 3: + // After that method finishes its loop and returns, the final step in Wait + // is to `select` (wait) on a message from the enqueud request's channel + // and return appropriately. While waiting this thread does no additional + // work so we decrement the go routine counter + qs.counter.Add(-1) + + select { + case execute := <-req.DequeueChannel: + if execute { + // execute the request + return false, true, func() { + qs.finishRequestAndDequeueWithChannelAsMuchAsPossible(req) + } + } + klog.V(5).Infof("request timed out after being enqueued\n") + metrics.AddReject(qs.config.Name, "time-out") + return false, false, func() {} + case <-ctx.Done(): + klog.V(5).Infof("request cancelled\n") + func() { + qs.lockAndSyncTime() + defer qs.lock.Unlock() + + // TODO(aaron-prindle) add metrics to these two cases + if req.Enqueued { + // remove the request from the queue as it has timed out + for i := range req.Queue.Requests { + if req == req.Queue.Requests[i] { + // remove the request + req.Queue.Requests = append(req.Queue.Requests[:i], + req.Queue.Requests[i+1:]...) + break + } + } + // At this point, if the qs is quiescing, + // has zero requests executing, and has zero requests enqueued + // then a call to the EmptyHandler should be forked. + qs.maybeForkEmptyHandlerLocked() + } else { + // At this point we know that req was in its queue earlier and another + // goroutine has removed req from its queue and called qs.counter.Add(1) + // in anticipation of unblocking this goroutine through the other arm of this + // select. In this case we need to decrement the counter because this goroutine + // was actually unblocked through a different code path. + qs.counter.Add(-1) + } + }() + return false, false, func() {} + } +} + +// syncTimeLocked is used to sync the time of the queueSet by looking at the elapsed +// time since the last sync and this value based on the 'virtualtime ratio' +// which scales inversely to the # of active flows +func (qs *queueSet) syncTimeLocked() { + realNow := qs.clock.Now() + timesincelast := realNow.Sub(qs.lastRealTime).Seconds() + qs.lastRealTime = realNow + var virtualTimeRatio float64 + + activeQueues := 0 + reqs := 0 + for _, queue := range qs.queues { + reqs += queue.RequestsExecuting + + if len(queue.Requests) > 0 || queue.RequestsExecuting > 0 { + activeQueues++ + } + } + if activeQueues != 0 { + // TODO(aaron-prindle) document the math.Min usage + virtualTimeRatio = math.Min(float64(reqs), float64(qs.config.ConcurrencyLimit)) / float64(activeQueues) + } + + qs.virtualTime += timesincelast * virtualTimeRatio +} + +func (qs *queueSet) lockAndSyncTime() { + qs.lock.Lock() + qs.syncTimeLocked() +} + +// timeoutOldRequestsAndRejectOrEnqueueLocked encapsulates the logic required +// to validate and enqueue a request for the queueSet/QueueSet: +// 1) Start with shuffle sharding, to pick a queue. +// 2) Reject old requests that have been waiting too long +// 3) Reject current request if there is not enough concurrency shares and +// we are at max queue length +// 4) If not rejected, create a request and enqueue +// returns the enqueud request on a successful enqueue +// returns nil in the case that there is no available concurrency or +// the queuelengthlimit has been reached +func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue uint64) *fq.Request { + // Start with the shuffle sharding, to pick a queue. + queueIdx := qs.chooseQueueIndexLocked(hashValue) + queue := qs.queues[queueIdx] + // The next step is the logic to reject requests that have been waiting too long + qs.removeTimedOutRequestsFromQueueLocked(queue) + // NOTE: currently timeout is only checked for each new request. This means that there can be + // requests that are in the queue longer than the timeout if there are no new requests + // We prefer the simplicity over the promptness, at least for now. + + // Create a request and enqueue + req := &fq.Request{ + DequeueChannel: make(chan bool, 1), + RealEnqueueTime: qs.clock.Now(), + Queue: queue, + } + if ok := qs.rejectOrEnqueueLocked(req); !ok { + return nil + } + metrics.ObserveQueueLength(qs.config.Name, len(queue.Requests)) + return req +} + +// removeTimedOutRequestsFromQueueLocked rejects old requests that have been enqueued +// past the requestWaitLimit +func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *fq.Queue) { + timeoutIdx := -1 + now := qs.clock.Now() + reqs := queue.Requests + // reqs are sorted oldest -> newest + // can short circuit loop (break) if oldest requests are not timing out + // as newer requests also will not have timed out + + // now - requestWaitLimit = waitLimit + waitLimit := now.Add(-qs.config.RequestWaitLimit) + for i, req := range reqs { + if waitLimit.After(req.RealEnqueueTime) { + qs.counter.Add(1) + req.DequeueChannel <- false + close(req.DequeueChannel) + // get index for timed out requests + timeoutIdx = i + } else { + break + } + } + // remove timed out requests from queue + if timeoutIdx != -1 { + // timeoutIdx + 1 to remove the last timeout req + removeIdx := timeoutIdx + 1 + // remove all the timeout requests + queue.Requests = reqs[removeIdx:] + // decrement the # of requestsEnqueued + qs.numRequestsEnqueued -= removeIdx + } +} + +// getRequestsExecutingLocked gets the # of requests which are "executing": +// this is the# of requests/requests which have been dequeued but have not had +// finished (via the FinishRequest method invoked after service) +func (qs *queueSet) getRequestsExecutingLocked() int { + total := 0 + for _, queue := range qs.queues { + total += queue.RequestsExecuting + } + return total +} + +// chooseQueueIndexLocked uses shuffle sharding to select a queue index +// using the given hashValue and the shuffle sharding parameters of the queueSet. +func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64) int { + bestQueueIdx := -1 + bestQueueLen := int(math.MaxInt32) + // DesiredNum is used here instead of numQueues to omit quiescing queues + qs.dealer.Deal(hashValue, func(queueIdx int) { + thisLen := len(qs.queues[queueIdx].Requests) + if thisLen < bestQueueLen { + bestQueueIdx, bestQueueLen = queueIdx, thisLen + } + }) + return bestQueueIdx +} + +// updateQueueVirtualStartTime updates the virtual start time for a queue +// this is done when a new request is enqueued. For more info see: +// https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/20190228-priority-and-fairness.md#dispatching +func (qs *queueSet) updateQueueVirtualStartTime(request *fq.Request, queue *fq.Queue) { + // When a request arrives to an empty queue with no requests executing: + // len(queue.Requests) == 1 as enqueue has just happened prior (vs == 0) + if len(queue.Requests) == 1 && queue.RequestsExecuting == 0 { + // the queue’s virtual start time is set to the virtual time. + queue.VirtualStart = qs.virtualTime + } +} + +// enqueues a request into an queueSet +func (qs *queueSet) enqueue(request *fq.Request) { + queue := request.Queue + queue.Enqueue(request) + qs.updateQueueVirtualStartTime(request, queue) + qs.numRequestsEnqueued++ + + metrics.UpdateFlowControlRequestsInQueue(qs.config.Name, qs.numRequestsEnqueued) +} + +// rejectOrEnqueueLocked rejects or enqueues the newly arrived request if +// resource criteria isn't met +func (qs *queueSet) rejectOrEnqueueLocked(request *fq.Request) bool { + queue := request.Queue + curQueueLength := len(queue.Requests) + // rejects the newly arrived request if resource criteria not met + if qs.getRequestsExecutingLocked() >= qs.config.ConcurrencyLimit && + curQueueLength >= qs.config.QueueLengthLimit { + return false + } + + qs.enqueue(request) + return true +} + +// selectQueue selects the minimum virtualFinish time from the set of queues +// the starting queue is selected via roundrobin +func (qs *queueSet) selectQueue() *fq.Queue { + minVirtualFinish := math.Inf(1) + var minQueue *fq.Queue + var minIndex int + for range qs.queues { + queue := qs.queues[qs.robinIndex] + if len(queue.Requests) != 0 { + currentVirtualFinish := queue.GetVirtualFinish(0, qs.estimatedServiceTime) + if currentVirtualFinish < minVirtualFinish { + minVirtualFinish = currentVirtualFinish + minQueue = queue + minIndex = qs.robinIndex + } + } + qs.robinIndex = (qs.robinIndex + 1) % len(qs.queues) + } + // we set the round robin indexing to start at the chose queue + // for the next round. This way the non-selected queues + // win in the case that the virtual finish times are the same + qs.robinIndex = minIndex + return minQueue +} + +// dequeue dequeues a request from the queueSet +func (qs *queueSet) dequeue() (*fq.Request, bool) { + queue := qs.selectQueue() + if queue == nil { + return nil, false + } + request, ok := queue.Dequeue() + if !ok { + return nil, false + } + // When a request is dequeued for service -> qs.VirtualStart += G + queue.VirtualStart += qs.estimatedServiceTime + request.StartTime = qs.clock.Now() + // request dequeued, service has started + queue.RequestsExecuting++ + metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, queue.RequestsExecuting) + qs.numRequestsEnqueued-- + return request, ok +} + +// dequeueWithChannelAsMuchAsPossible runs a loop, as long as there +// are non-empty queues and the number currently executing is less than the +// assured concurrency value. The body of the loop uses the fair queuing +// technique to pick a queue, dequeue the request at the head of that +// queue, increment the count of the number executing, and send true +// to the request's channel. +func (qs *queueSet) dequeueWithChannelAsMuchAsPossible() { + for qs.numRequestsEnqueued != 0 && qs.getRequestsExecutingLocked() < qs.config.ConcurrencyLimit { + _, ok := qs.dequeueWithChannel() + if !ok { + break + } + } +} + +// dequeueWithChannel is a convenience method for dequeueing requests that +// require a message to be sent through the requests channel +// this is a required pattern for the QueueSet the queueSet supports +func (qs *queueSet) dequeueWithChannel() (*fq.Request, bool) { + req, ok := qs.dequeue() + if !ok { + return nil, false + } + qs.counter.Add(1) + req.DequeueChannel <- true + close(req.DequeueChannel) + return req, ok +} + +// removeQueueAndUpdateIndexes uses reslicing to remove an index from a slice +// and then updates the 'Index' field of the queues to be correct +func removeQueueAndUpdateIndexes(queues []*fq.Queue, index int) []*fq.Queue { + keptQueues := append(queues[:index], queues[index+1:]...) + for i := index; i < len(keptQueues); i++ { + keptQueues[i].Index-- + } + return keptQueues +} + +// finishRequestLocked is a callback that should be used when a previously dequeued request +// has completed it's service. This callback updates important state in the +// queueSet +func (qs *queueSet) finishRequestLocked(r *fq.Request) { + S := qs.clock.Since(r.StartTime).Seconds() + + // When a request finishes being served, and the actual service time was S, + // the queue’s virtual start time is decremented by G - S. + r.Queue.VirtualStart -= qs.estimatedServiceTime - S + + // request has finished, remove from requests executing + r.Queue.RequestsExecuting-- + + // Logic to remove quiesced queues + // >= as QueueIdx=25 is out of bounds for DesiredNum=25 [0...24] + if r.Queue.Index >= qs.config.DesiredNumQueues && + len(r.Queue.Requests) == 0 && + r.Queue.RequestsExecuting == 0 { + qs.queues = removeQueueAndUpdateIndexes(qs.queues, r.Queue.Index) + + // decrement here to maintain the invariant that (qs.robinIndex+1) % numQueues + // is the index of the next queue after the one last dispatched from + if qs.robinIndex >= -r.Queue.Index { + qs.robinIndex-- + } + + // At this point, if the qs is quiescing, + // has zero requests executing, and has zero requests enqueued + // then a call to the EmptyHandler should be forked. + qs.maybeForkEmptyHandlerLocked() + } +} + +func (qs *queueSet) maybeForkEmptyHandlerLocked() { + if qs.emptyHandler != nil && qs.numRequestsEnqueued == 0 && + qs.getRequestsExecutingLocked() == 0 { + qs.counter.Add(1) + go func(eh fq.EmptyHandler) { + defer runtime.HandleCrash() + defer qs.counter.Add(-1) + eh.HandleEmpty() + }(qs.emptyHandler) + } +} + +// finishRequestAndDequeueWithChannelAsMuchAsPossible is a convenience method which calls finishRequest +// for a given request and then dequeues as many requests as possible +// and updates that request's channel signifying it is is dequeued +// this is a callback used for the filter that the queueSet supports +func (qs *queueSet) finishRequestAndDequeueWithChannelAsMuchAsPossible(req *fq.Request) { + qs.lockAndSyncTime() + defer qs.lock.Unlock() + + qs.finishRequestLocked(req) + qs.dequeueWithChannelAsMuchAsPossible() +} diff --git a/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go new file mode 100644 index 000000000..7298984ae --- /dev/null +++ b/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -0,0 +1,212 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queueset + +import ( + "context" + "math" + "sync/atomic" + "testing" + "time" + + "k8s.io/apiserver/pkg/util/flowcontrol/counter" + fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" + test "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing" + "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock" +) + +type uniformScenario []uniformClient + +type uniformClient struct { + hash uint64 + nThreads int + nCalls int + execDuration time.Duration + thinkDuration time.Duration +} + +// exerciseQueueSetUniformScenario. Simple logic, only works if each +// client's offered load is at least as large as its fair share of +// capacity. +func exerciseQueueSetUniformScenario(t *testing.T, qs fq.QueueSet, sc uniformScenario, + totalDuration time.Duration, expectPass bool, expectedAllRequests bool, + clk *clock.FakeEventClock, counter counter.GoRoutineCounter) { + + now := time.Now() + t.Logf("%s: Start", clk.Now().Format("2006-01-02 15:04:05.000000000")) + integrators := make([]test.Integrator, len(sc)) + var failedCount uint64 + for i, uc := range sc { + integrators[i] = test.NewIntegrator(clk) + for j := 0; j < uc.nThreads; j++ { + counter.Add(1) + go func(i, j int, uc uniformClient, igr test.Integrator) { + for k := 0; k < uc.nCalls; k++ { + ClockWait(clk, counter, uc.thinkDuration) + for { + tryAnother, execute, afterExecute := qs.Wait(context.Background(), uc.hash) + t.Logf("%s: %d, %d, %d got q=%v, e=%v", clk.Now().Format("2006-01-02 15:04:05.000000000"), i, j, k, tryAnother, execute) + if tryAnother { + continue + } + if !execute { + atomic.AddUint64(&failedCount, 1) + break + } + igr.Add(1) + ClockWait(clk, counter, uc.execDuration) + afterExecute() + igr.Add(-1) + break + } + } + counter.Add(-1) + }(i, j, uc, integrators[i]) + } + } + lim := now.Add(totalDuration) + clk.Run(&lim) + clk.SetTime(lim) + t.Logf("%s: End", clk.Now().Format("2006-01-02 15:04:05.000000000")) + results := make([]test.IntegratorResults, len(sc)) + var sumOfAvg float64 + for i := range sc { + results[i] = integrators[i].GetResults() + sumOfAvg += results[i].Average + } + idealAverage := sumOfAvg / float64(len(sc)) + passes := make([]bool, len(sc)) + allPass := true + for i := range sc { + relDiff := (results[i].Average - idealAverage) / idealAverage + passes[i] = math.Abs(relDiff) <= 0.1 + allPass = allPass && passes[i] + } + for i := range sc { + if allPass != expectPass { + t.Errorf("Class %d got an Average of %v but the ideal was %v", i, results[i].Average, idealAverage) + } else { + t.Logf("Class %d got an Average of %v and the ideal was %v", i, results[i].Average, idealAverage) + } + } + + clk.Run(nil) + if expectedAllRequests && failedCount > 0 { + t.Errorf("Expected all requests to be successful but got %v failed requests", failedCount) + } else if !expectedAllRequests && failedCount == 0 { + t.Errorf("Expected failed requests but all requests succeeded") + } +} + +// TestNoRestraint should fail because the dummy QueueSet exercises no control +func TestNoRestraint(t *testing.T) { + now := time.Now() + clk, counter := clock.NewFakeEventClock(now, 0, nil) + nrf := test.NewNoRestraintFactory() + config := fq.QueueSetConfig{} + nr, err := nrf.NewQueueSet(config) + if err != nil { + t.Fatalf("QueueSet creation failed with %v", err) + } + exerciseQueueSetUniformScenario(t, nr, []uniformClient{ + {1001001001, 5, 10, time.Second, time.Second}, + {2002002002, 2, 10, time.Second, time.Second / 2}, + }, time.Second*10, false, true, clk, counter) +} + +func TestUniformFlows(t *testing.T) { + now := time.Now() + + clk, counter := clock.NewFakeEventClock(now, 0, nil) + qsf := NewQueueSetFactory(clk, counter) + config := fq.QueueSetConfig{ + Name: "TestUniformFlows", + ConcurrencyLimit: 100, + DesiredNumQueues: 128, + QueueLengthLimit: 128, + HandSize: 1, + RequestWaitLimit: 10 * time.Minute, + } + qs, err := qsf.NewQueueSet(config) + if err != nil { + t.Fatalf("QueueSet creation failed with %v", err) + } + + exerciseQueueSetUniformScenario(t, qs, []uniformClient{ + {1001001001, 5, 10, time.Second, time.Second}, + {2002002002, 5, 10, time.Second, time.Second}, + }, time.Second*10, true, true, clk, counter) +} + +func TestDifferentFlows(t *testing.T) { + now := time.Now() + + clk, counter := clock.NewFakeEventClock(now, 0, nil) + qsf := NewQueueSetFactory(clk, counter) + config := fq.QueueSetConfig{ + Name: "TestDifferentFlows", + ConcurrencyLimit: 1, + DesiredNumQueues: 128, + QueueLengthLimit: 128, + HandSize: 1, + RequestWaitLimit: 10 * time.Minute, + } + qs, err := qsf.NewQueueSet(config) + if err != nil { + t.Fatalf("QueueSet creation failed with %v", err) + } + + exerciseQueueSetUniformScenario(t, qs, []uniformClient{ + {1001001001, 5, 10, time.Second, time.Second}, + {2002002002, 2, 5, time.Second, time.Second / 2}, + }, time.Second*10, true, true, clk, counter) +} + +func TestTimeout(t *testing.T) { + now := time.Now() + + clk, counter := clock.NewFakeEventClock(now, 0, nil) + qsf := NewQueueSetFactory(clk, counter) + config := fq.QueueSetConfig{ + Name: "TestTimeout", + ConcurrencyLimit: 1, + DesiredNumQueues: 128, + QueueLengthLimit: 128, + HandSize: 1, + RequestWaitLimit: 0, + } + qs, err := qsf.NewQueueSet(config) + if err != nil { + t.Fatalf("QueueSet creation failed with %v", err) + } + + exerciseQueueSetUniformScenario(t, qs, []uniformClient{ + {1001001001, 5, 100, time.Second, time.Second}, + }, time.Second*10, true, false, clk, counter) +} + +func ClockWait(clk *clock.FakeEventClock, counter counter.GoRoutineCounter, duration time.Duration) { + dunch := make(chan struct{}) + clk.EventAfterDuration(func(time.Time) { + counter.Add(1) + close(dunch) + }, duration) + counter.Add(-1) + select { + case <-dunch: + } +} diff --git a/pkg/util/flowcontrol/fairqueuing/testing/clock/event_clock.go b/pkg/util/flowcontrol/fairqueuing/testing/clock/event_clock.go new file mode 100644 index 000000000..089a2df55 --- /dev/null +++ b/pkg/util/flowcontrol/fairqueuing/testing/clock/event_clock.go @@ -0,0 +1,222 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package clock + +import ( + "container/heap" + "math/rand" + "sync" + "time" + + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apiserver/pkg/util/flowcontrol/counter" +) + +// EventFunc does some work that needs to be done at or after the +// given time. After this function returns, associated work may continue +// on other goroutines only if they are counted by the GoRoutineCounter +// of the FakeEventClock handling this EventFunc. +type EventFunc func(time.Time) + +// EventClock fires event on time +type EventClock interface { + clock.PassiveClock + EventAfterDuration(f EventFunc, d time.Duration) + EventAfterTime(f EventFunc, t time.Time) +} + +// RealEventClock fires event on real world time +type RealEventClock struct { + clock.RealClock +} + +// EventAfterDuration schedules an EventFunc +func (RealEventClock) EventAfterDuration(f EventFunc, d time.Duration) { + ch := time.After(d) + go func() { + select { + case t := <-ch: + f(t) + } + }() +} + +// EventAfterTime schedules an EventFunc +func (r RealEventClock) EventAfterTime(f EventFunc, t time.Time) { + now := time.Now() + d := t.Sub(now) + if d <= 0 { + go f(now) + } else { + r.EventAfterDuration(f, d) + } +} + +// waitGroupCounter is a wait group used for a GoRoutine Counter. This private +// type is used to disallow direct waitGroup access +type waitGroupCounter struct{ sync.WaitGroup } + +// compile time assertion that waitGroupCounter meets requirements +// of GoRoutineCounter +var _ counter.GoRoutineCounter = (*waitGroupCounter)(nil) + +// FakeEventClock is one whose time does not pass implicitly but +// rather is explicitly set by invocations of its SetTime method +type FakeEventClock struct { + clock.FakePassiveClock + + // waiters is a heap of waiting work, sorted by time + waiters eventWaiterHeap + waitersLock sync.RWMutex + + // clientWG may be nil and if not supplies constraints on time + // passing in Run. The Run method will not pick a new time until + // this is nil or its counter is zero. + clientWG *waitGroupCounter + + // fuzz is the amount of noise to add to scheduling. An event + // requested to run at time T will run at some time chosen + // uniformly at random from the interval [T, T+fuzz]; the upper + // bound is exclusive iff fuzz is non-zero. + fuzz time.Duration + + // rand is the random number generator to use in fuzzing + rand *rand.Rand +} + +type eventWaiterHeap []eventWaiter + +var _ heap.Interface = (*eventWaiterHeap)(nil) + +type eventWaiter struct { + targetTime time.Time + f EventFunc +} + +// NewFakeEventClock constructor. The given `r *rand.Rand` must +// henceforth not be used for any other purpose. If `r` is nil then a +// fresh one will be constructed, seeded with the current real time. +// The clientWG can be `nil` and if not is used to let Run know about +// additional work that has to complete before time can advance. +func NewFakeEventClock(t time.Time, fuzz time.Duration, r *rand.Rand) (*FakeEventClock, counter.GoRoutineCounter) { + grc := &waitGroupCounter{} + + if r == nil { + r = rand.New(rand.NewSource(time.Now().UnixNano())) + r.Uint64() + r.Uint64() + r.Uint64() + } + return &FakeEventClock{ + FakePassiveClock: *clock.NewFakePassiveClock(t), + clientWG: grc, + fuzz: fuzz, + rand: r, + }, grc +} + +// GetNextTime returns the next time at which there is work scheduled, +// and a bool indicating whether there is any such time +func (fec *FakeEventClock) GetNextTime() (time.Time, bool) { + fec.waitersLock.RLock() + defer fec.waitersLock.RUnlock() + if len(fec.waiters) > 0 { + return fec.waiters[0].targetTime, true + } + return time.Time{}, false +} + +// Run runs all the events scheduled, and all the events they +// schedule, and so on, until there are none scheduled or the limit is not +// nil and the next time would exceed the limit. The clientWG given in +// the constructor gates each advance of time. +func (fec *FakeEventClock) Run(limit *time.Time) { + for { + fec.clientWG.Wait() + t, ok := fec.GetNextTime() + if !ok || limit != nil && t.After(*limit) { + break + } + fec.SetTime(t) + } +} + +// SetTime sets the time and runs to completion all events that should +// be started by the given time --- including any further events they +// schedule +func (fec *FakeEventClock) SetTime(t time.Time) { + fec.FakePassiveClock.SetTime(t) + for { + foundSome := false + func() { + fec.waitersLock.Lock() + defer fec.waitersLock.Unlock() + // This loop is because events run at a given time may schedule more + // events to run at that or an earlier time. + // Events should not advance the clock. But just in case they do... + now := fec.Now() + var wg sync.WaitGroup + for len(fec.waiters) > 0 && !now.Before(fec.waiters[0].targetTime) { + ew := heap.Pop(&fec.waiters).(eventWaiter) + wg.Add(1) + go func(f EventFunc) { f(now); wg.Done() }(ew.f) + foundSome = true + } + wg.Wait() + }() + if !foundSome { + break + } + } +} + +// EventAfterDuration schedules the given function to be invoked once +// the given duration has passed. +func (fec *FakeEventClock) EventAfterDuration(f EventFunc, d time.Duration) { + fec.waitersLock.Lock() + defer fec.waitersLock.Unlock() + now := fec.Now() + fd := time.Duration(float32(fec.fuzz) * fec.rand.Float32()) + heap.Push(&fec.waiters, eventWaiter{targetTime: now.Add(d + fd), f: f}) +} + +// EventAfterTime schedules the given function to be invoked once +// the given time has arrived. +func (fec *FakeEventClock) EventAfterTime(f EventFunc, t time.Time) { + fec.waitersLock.Lock() + defer fec.waitersLock.Unlock() + fd := time.Duration(float32(fec.fuzz) * fec.rand.Float32()) + heap.Push(&fec.waiters, eventWaiter{targetTime: t.Add(fd), f: f}) +} + +func (ewh eventWaiterHeap) Len() int { return len(ewh) } + +func (ewh eventWaiterHeap) Less(i, j int) bool { return ewh[i].targetTime.Before(ewh[j].targetTime) } + +func (ewh eventWaiterHeap) Swap(i, j int) { ewh[i], ewh[j] = ewh[j], ewh[i] } + +func (ewh *eventWaiterHeap) Push(x interface{}) { + *ewh = append(*ewh, x.(eventWaiter)) +} + +func (ewh *eventWaiterHeap) Pop() interface{} { + old := *ewh + n := len(old) + x := old[n-1] + *ewh = old[:n-1] + return x +} diff --git a/pkg/util/flowcontrol/fairqueuing/testing/clock/event_clock_test.go b/pkg/util/flowcontrol/fairqueuing/testing/clock/event_clock_test.go new file mode 100644 index 000000000..2cd58ef57 --- /dev/null +++ b/pkg/util/flowcontrol/fairqueuing/testing/clock/event_clock_test.go @@ -0,0 +1,183 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package clock + +import ( + "math/rand" + "sync/atomic" + "testing" + "time" + + "k8s.io/apimachinery/pkg/util/clock" +) + +type TestableEventClock interface { + EventClock + SetTime(time.Time) + Run(*time.Time) +} + +// settablePassiveClock allows setting current time of a passive clock +type settablePassiveClock interface { + clock.PassiveClock + SetTime(time.Time) +} + +func exerciseTestableEventClock(t *testing.T, ec TestableEventClock, fuzz time.Duration) { + exercisePassiveClock(t, ec) + var numDone int32 + now := ec.Now() + strictable := true + const batchSize = 100 + times := make(chan time.Time, batchSize+1) + try := func(abs, strict bool, d time.Duration) { + f := func(u time.Time) { + realD := ec.Since(now) + atomic.AddInt32(&numDone, 1) + times <- u + if realD < d || strict && strictable && realD > d+fuzz { + t.Errorf("Asked for %v, got %v", d, realD) + } + } + if abs { + ec.EventAfterTime(f, now.Add(d)) + } else { + ec.EventAfterDuration(f, d) + } + } + try(true, true, time.Minute) + for i := 0; i < batchSize; i++ { + d := time.Duration(rand.Intn(30)-3) * time.Second + try(i%2 == 0, d >= 0, d) + } + ec.Run(nil) + if numDone != batchSize+1 { + t.Errorf("Got only %v events", numDone) + } + lastTime := now.Add(-3 * time.Second) + for i := 0; i <= batchSize; i++ { + nextTime := <-times + if nextTime.Before(lastTime) { + t.Errorf("Got %s after %s", nextTime, lastTime) + } + } + endTime := ec.Now() + dx := endTime.Sub(now) + if dx > time.Minute+fuzz { + t.Errorf("Run started at %#+v, ended at %#+v, dx=%d", now, endTime, dx) + } + now = endTime + var shouldRun int32 + strictable = false + for i := 0; i < batchSize; i++ { + d := time.Duration(rand.Intn(30)-3) * time.Second + try(i%2 == 0, d >= 0, d) + if d <= 12*time.Second { + shouldRun++ + } + } + ec.SetTime(now.Add(13*time.Second - 1)) + if numDone != batchSize+1+shouldRun { + t.Errorf("Expected %v, but %v ran", shouldRun, numDone-batchSize-1) + } + lastTime = now.Add(-3 * time.Second) + for i := int32(0); i < shouldRun; i++ { + nextTime := <-times + if nextTime.Before(lastTime) { + t.Errorf("Got %s after %s", nextTime, lastTime) + } + lastTime = nextTime + } +} + +func exercisePassiveClock(t *testing.T, pc settablePassiveClock) { + t1 := time.Now() + t2 := t1.Add(time.Hour) + pc.SetTime(t1) + tx := pc.Now() + if tx != t1 { + t.Errorf("SetTime(%#+v); Now() => %#+v", t1, tx) + } + dx := pc.Since(t1) + if dx != 0 { + t.Errorf("Since() => %v", dx) + } + pc.SetTime(t2) + dx = pc.Since(t1) + if dx != time.Hour { + t.Errorf("Since() => %v", dx) + } + tx = pc.Now() + if tx != t2 { + t.Errorf("Now() => %#+v", tx) + } +} + +func TestFakeEventClock(t *testing.T) { + startTime := time.Now() + fec, _ := NewFakeEventClock(startTime, 0, nil) + exerciseTestableEventClock(t, fec, 0) + fec, _ = NewFakeEventClock(startTime, time.Second, nil) + exerciseTestableEventClock(t, fec, time.Second) +} + +func exerciseEventClock(t *testing.T, ec EventClock, relax func(time.Duration)) { + var numDone int32 + now := ec.Now() + const batchSize = 100 + times := make(chan time.Time, batchSize+1) + try := func(abs bool, d time.Duration) { + f := func(u time.Time) { + realD := ec.Since(now) + atomic.AddInt32(&numDone, 1) + times <- u + if realD < d { + t.Errorf("Asked for %v, got %v", d, realD) + } + } + if abs { + ec.EventAfterTime(f, now.Add(d)) + } else { + ec.EventAfterDuration(f, d) + } + } + try(true, time.Millisecond*3300) + for i := 0; i < batchSize; i++ { + d := time.Duration(rand.Intn(30)-3) * time.Millisecond * 100 + try(i%2 == 0, d) + } + relax(time.Second * 4) + if atomic.LoadInt32(&numDone) != batchSize+1 { + t.Errorf("Got only %v events", numDone) + } + lastTime := now + for i := 0; i <= batchSize; i++ { + nextTime := <-times + if nextTime.Before(now) { + continue + } + dt := nextTime.Sub(lastTime) / (50 * time.Millisecond) + if dt < 0 { + t.Errorf("Got %s after %s", nextTime, lastTime) + } + lastTime = nextTime + } +} + +func TestRealEventClock(t *testing.T) { + exerciseEventClock(t, RealEventClock{}, func(d time.Duration) { time.Sleep(d) }) +} diff --git a/pkg/util/flowcontrol/fairqueuing/testing/integrator.go b/pkg/util/flowcontrol/fairqueuing/testing/integrator.go new file mode 100644 index 000000000..22c75b7f2 --- /dev/null +++ b/pkg/util/flowcontrol/fairqueuing/testing/integrator.go @@ -0,0 +1,103 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "math" + "sync" + "time" + + "k8s.io/apimachinery/pkg/util/clock" +) + +// Integrator computes the integral of some variable X over time as +// read from a particular clock. The integral starts when the +// Integrator is created, and ends at the latest operation on the +// Integrator. +type Integrator interface { + Set(float64) // set the value of X + Add(float64) // add the given quantity to X + GetResults() IntegratorResults +} + +// IntegratorResults holds statistical abstracts of the integration +type IntegratorResults struct { + Duration float64 //seconds + Average float64 + Deviation float64 //sqrt(avg((value-avg)^2)) +} + +type integrator struct { + sync.Mutex + clk clock.PassiveClock + lastTime time.Time + x float64 + integrals [3]float64 // integral of x^0, x^1, and x^2 +} + +// NewIntegrator makes one that uses the given clock +func NewIntegrator(clk clock.PassiveClock) Integrator { + return &integrator{ + clk: clk, + lastTime: clk.Now(), + } +} + +func (igr *integrator) Set(x float64) { + igr.Lock() + igr.updateLocked() + igr.x = x + igr.Unlock() +} + +func (igr *integrator) Add(deltaX float64) { + igr.Lock() + igr.updateLocked() + igr.x += deltaX + igr.Unlock() +} + +func (igr *integrator) updateLocked() { + now := igr.clk.Now() + dt := now.Sub(igr.lastTime).Seconds() + igr.lastTime = now + igr.integrals[0] += dt + igr.integrals[1] += dt * igr.x + igr.integrals[2] += dt * igr.x * igr.x +} + +func (igr *integrator) GetResults() (results IntegratorResults) { + igr.Lock() + defer func() { igr.Unlock() }() + igr.updateLocked() + results.Duration = igr.integrals[0] + if results.Duration <= 0 { + results.Average = math.NaN() + results.Deviation = math.NaN() + return + } + results.Average = igr.integrals[1] / igr.integrals[0] + // Deviation is sqrt( Integral( (x - xbar)^2 dt) / Duration ) + // = sqrt( Integral( x^2 + xbar^2 -2*x*xbar dt ) / Duration ) + // = sqrt( ( Integral( x^2 dt ) + Duration * xbar^2 - 2*xbar*Integral(x dt) ) / Duration) + // = sqrt( Integral(x^2 dt)/Duration - xbar^2 ) + variance := igr.integrals[2]/igr.integrals[0] - results.Average*results.Average + if variance > 0 { + results.Deviation = math.Sqrt(variance) + } + return +} diff --git a/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go b/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go new file mode 100644 index 000000000..f8aab295b --- /dev/null +++ b/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go @@ -0,0 +1,49 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "context" + + fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" +) + +// NewNoRestraintFactory makes a QueueSetFactory that produces +// QueueSets that exert no restraint --- every request is dispatched +// for execution as soon as it arrives. +func NewNoRestraintFactory() fq.QueueSetFactory { + return noRestraintFactory{} +} + +type noRestraintFactory struct{} + +func (noRestraintFactory) NewQueueSet(config fq.QueueSetConfig) (fq.QueueSet, error) { + return noRestraint{}, nil +} + +type noRestraint struct{} + +func (noRestraint) SetConfiguration(config fq.QueueSetConfig) error { + return nil +} + +func (noRestraint) Quiesce(fq.EmptyHandler) { +} + +func (noRestraint) Wait(ctx context.Context, hashValue uint64) (quiescent, execute bool, afterExecution func()) { + return false, true, func() {} +} diff --git a/pkg/util/flowcontrol/fairqueuing/types.go b/pkg/util/flowcontrol/fairqueuing/types.go new file mode 100644 index 000000000..5ed89ad89 --- /dev/null +++ b/pkg/util/flowcontrol/fairqueuing/types.go @@ -0,0 +1,73 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fairqueuing + +import ( + "time" +) + +// Request is a temporary container for "requests" with additional tracking fields +// required for the functionality FQScheduler +type Request struct { + //TODO(aaron-prindle) seq is only used for testing, this was abstracted + // via an interface before, keeping this for now + QueueIdx int + + Queue *Queue + StartTime time.Time + DequeueChannel chan bool + RealEnqueueTime time.Time + Enqueued bool +} + +// Queue is an array of requests with additional metadata required for +// the FQScheduler +type Queue struct { + Requests []*Request + VirtualStart float64 + RequestsExecuting int + Index int +} + +// Enqueue enqueues a request into the queue +func (q *Queue) Enqueue(request *Request) { + request.Enqueued = true + q.Requests = append(q.Requests, request) +} + +// Dequeue dequeues a request from the queue +func (q *Queue) Dequeue() (*Request, bool) { + if len(q.Requests) == 0 { + return nil, false + } + request := q.Requests[0] + q.Requests = q.Requests[1:] + + request.Enqueued = false + return request, true +} + +// GetVirtualFinish returns the expected virtual finish time of the request at +// index J in the queue with estimated finish time G +func (q *Queue) GetVirtualFinish(J int, G float64) float64 { + // The virtual finish time of request number J in the queue + // (counting from J=1 for the head) is J * G + (virtual start time). + + // counting from J=1 for the head (eg: queue.Requests[0] -> J=1) - J+1 + jg := float64(J+1) * float64(G) + return jg + q.VirtualStart +} diff --git a/pkg/util/flowcontrol/metrics/metrics.go b/pkg/util/flowcontrol/metrics/metrics.go new file mode 100644 index 000000000..2e026b9c3 --- /dev/null +++ b/pkg/util/flowcontrol/metrics/metrics.go @@ -0,0 +1,152 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +const ( + namespace = "apiserver" + subsystem = "flowcontrol" +) + +const ( + priorityLevel = "priorityLevel" + flowSchema = "flowSchema" +) + +var ( + queueLengthBuckets = []float64{0, 10, 25, 50, 100, 250, 500, 1000} + requestDurationSecondsBuckets = []float64{0, 0.005, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 30} +) + +func init() { + prometheus.MustRegister(apiserverRejectedRequests) + prometheus.MustRegister(apiserverCurrentInqueueRequests) + prometheus.MustRegister(apiserverRequestQueueLength) + prometheus.MustRegister(apiserverRequestConcurrencyLimit) + prometheus.MustRegister(apiserverCurrentExecutingRequests) + prometheus.MustRegister(apiserverRequestWaitingSeconds) + prometheus.MustRegister(apiserverRequestExecutionSeconds) +} + +var ( + apiserverRejectedRequests = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "rejectedRequests", + Help: "Number of rejected requests by api priority and fairness system", + }, + []string{priorityLevel, "reason"}, + ) + apiserverCurrentInqueueRequests = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "currentInqueueRequests", + Help: "Number of requests currently pending in the queue by the api priority and fairness system", + }, + []string{priorityLevel}, + ) + apiserverRequestQueueLength = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "requestQueueLength", + Help: "Length of queue in the api priority and fairness system", + Buckets: queueLengthBuckets, + }, + []string{priorityLevel}, + ) + apiserverRequestConcurrencyLimit = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "requestConcurrencyLimit", + Help: "Shared concurrency limit in the api priority and fairness system", + }, + []string{priorityLevel}, + ) + apiserverCurrentExecutingRequests = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "currentExecutingRequests", + Help: "Number of requests currently executing in the api priority and fairness system", + }, + []string{priorityLevel}, + ) + apiserverRequestWaitingSeconds = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "request_wait_durationSeconds", + Help: "Length of time a request spent waiting in its queue", + Buckets: requestDurationSecondsBuckets, + }, + []string{priorityLevel, flowSchema, "execute"}, + ) + apiserverRequestExecutionSeconds = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "requestExecutionSeconds", + Help: "Time of request executing in the api priority and fairness system", + Buckets: requestDurationSecondsBuckets, + }, + []string{priorityLevel, flowSchema}, + ) +) + +// UpdateFlowControlRequestsInQueue updates the value for the # of requests in the specified queues in flow control +func UpdateFlowControlRequestsInQueue(priorityLevel string, inqueue int) { + apiserverCurrentInqueueRequests.WithLabelValues(priorityLevel).Set(float64(inqueue)) +} + +// UpdateFlowControlRequestsExecuting updates the value for the # of requests executing in flow control +func UpdateFlowControlRequestsExecuting(priorityLevel string, executing int) { + apiserverCurrentExecutingRequests.WithLabelValues(priorityLevel).Set(float64(executing)) +} + +// UpdateSharedConcurrencyLimit updates the value for the concurrency limit in flow control +func UpdateSharedConcurrencyLimit(priorityLevel string, limit int) { + apiserverRequestConcurrencyLimit.WithLabelValues(priorityLevel).Set(float64(limit)) +} + +// AddReject increments the # of rejected requests for flow control +func AddReject(priorityLevel string, reason string) { + apiserverRejectedRequests.WithLabelValues(priorityLevel, reason).Add(1) +} + +// ObserveQueueLength observes the queue length for flow control +func ObserveQueueLength(priorityLevel string, length int) { + apiserverRequestQueueLength.WithLabelValues(priorityLevel).Observe(float64(length)) +} + +// ObserveWaitingDuration observes the queue length for flow control +func ObserveWaitingDuration(priorityLevel, flowSchema, execute string, waitTime time.Duration) { + apiserverRequestWaitingSeconds.WithLabelValues(priorityLevel, flowSchema, execute).Observe(waitTime.Seconds()) +} + +// ObserveExecutionDuration observes the execution duration for flow control +func ObserveExecutionDuration(priorityLevel, flowSchema string, executionTime time.Duration) { + apiserverRequestExecutionSeconds.WithLabelValues(priorityLevel, flowSchema).Observe(executionTime.Seconds()) +}