introduce apf debug endpoint
Kubernetes-commit: 567becd5eedd9a1f3802f2b3b0b6b8efc445d2a1
This commit is contained in:
parent
3b3b51070b
commit
7f1f8c2578
|
@ -53,6 +53,7 @@ import (
|
|||
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
|
||||
apiopenapi "k8s.io/apiserver/pkg/endpoints/openapi"
|
||||
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
genericregistry "k8s.io/apiserver/pkg/registry/generic"
|
||||
"k8s.io/apiserver/pkg/server/dynamiccertificates"
|
||||
"k8s.io/apiserver/pkg/server/egressselector"
|
||||
|
@ -60,6 +61,7 @@ import (
|
|||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
"k8s.io/apiserver/pkg/server/routes"
|
||||
serverstore "k8s.io/apiserver/pkg/server/storage"
|
||||
"k8s.io/apiserver/pkg/util/feature"
|
||||
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
||||
"k8s.io/client-go/informers"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
|
@ -709,6 +711,9 @@ func installAPI(s *GenericAPIServer, c *Config) {
|
|||
if c.EnableDiscovery {
|
||||
s.Handler.GoRestfulContainer.Add(s.DiscoveryGroupManager.WebService())
|
||||
}
|
||||
if feature.DefaultFeatureGate.Enabled(features.APIPriorityAndFairness) {
|
||||
c.FlowControl.Install(s.Handler.NonGoRestfulMux)
|
||||
}
|
||||
}
|
||||
|
||||
func NewRequestInfoResolver(c *Config) *apirequest.RequestInfoFactory {
|
||||
|
|
|
@ -641,14 +641,15 @@ func (cfgCtl *configController) startRequest(ctx context.Context, rd RequestDige
|
|||
numQueues = plState.pl.Spec.Limited.LimitResponse.Queuing.Queues
|
||||
|
||||
}
|
||||
var flowDistinguisher string
|
||||
var hashValue uint64
|
||||
if numQueues > 1 {
|
||||
flowDistinguisher := computeFlowDistinguisher(rd, fs.Spec.DistinguisherMethod)
|
||||
flowDistinguisher = computeFlowDistinguisher(rd, fs.Spec.DistinguisherMethod)
|
||||
hashValue = hashFlowID(fs.Name, flowDistinguisher)
|
||||
}
|
||||
startWaitingTime = time.Now()
|
||||
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, fs.Name, fs.Spec.DistinguisherMethod, plName, numQueues)
|
||||
req, idle := plState.queues.StartRequest(ctx, hashValue, fs.Name, rd.RequestInfo, rd.User)
|
||||
req, idle := plState.queues.StartRequest(ctx, hashValue, flowDistinguisher, fs.Name, rd.RequestInfo, rd.User)
|
||||
if idle {
|
||||
cfgCtl.maybeReapLocked(plName, plState)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,277 @@
|
|||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package flowcontrol
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"text/tabwriter"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apiserver/pkg/server/mux"
|
||||
)
|
||||
|
||||
const (
|
||||
queryIncludeRequestDetails = "includeRequestDetails"
|
||||
)
|
||||
|
||||
func (cfgCtl *configController) Install(c *mux.PathRecorderMux) {
|
||||
// TODO(yue9944882): handle "Accept" header properly
|
||||
// debugging dumps a CSV content for three levels of granularity
|
||||
// 1. row per priority-level
|
||||
c.UnlistedHandleFunc("/debug/api_priority_and_fairness/dump_priority_levels", cfgCtl.dumpPriorityLevels)
|
||||
// 2. row per queue
|
||||
c.UnlistedHandleFunc("/debug/api_priority_and_fairness/dump_queues", cfgCtl.dumpQueues)
|
||||
// 3. row per request
|
||||
c.UnlistedHandleFunc("/debug/api_priority_and_fairness/dump_requests", cfgCtl.dumpRequests)
|
||||
}
|
||||
|
||||
func (cfgCtl *configController) dumpPriorityLevels(w http.ResponseWriter, r *http.Request) {
|
||||
cfgCtl.lock.Lock()
|
||||
defer cfgCtl.lock.Unlock()
|
||||
tabWriter := tabwriter.NewWriter(w, 8, 0, 1, ' ', 0)
|
||||
columnHeaders := []string{
|
||||
"PriorityLevelName", // 1
|
||||
"ActiveQueues", // 2
|
||||
"IsIdle", // 3
|
||||
"IsQuiescing", // 4
|
||||
"WaitingRequests", // 5
|
||||
"ExecutingRequests", // 6
|
||||
}
|
||||
tabPrint(tabWriter, rowForHeaders(columnHeaders))
|
||||
endline(tabWriter)
|
||||
for _, plState := range cfgCtl.priorityLevelStates {
|
||||
if plState.queues == nil {
|
||||
tabPrint(tabWriter, row(
|
||||
plState.pl.Name, // 1
|
||||
"<none>", // 2
|
||||
"<none>", // 3
|
||||
"<none>", // 4
|
||||
"<none>", // 5
|
||||
"<none>", // 6
|
||||
))
|
||||
endline(tabWriter)
|
||||
continue
|
||||
}
|
||||
queueSetDigest := plState.queues.Dump(false)
|
||||
activeQueueNum := 0
|
||||
for _, q := range queueSetDigest.Queues {
|
||||
if len(q.Requests) > 0 {
|
||||
activeQueueNum++
|
||||
}
|
||||
}
|
||||
|
||||
tabPrint(tabWriter, rowForPriorityLevel(
|
||||
plState.pl.Name, // 1
|
||||
activeQueueNum, // 2
|
||||
plState.queues.IsIdle(), // 3
|
||||
plState.quiescing, // 4
|
||||
queueSetDigest.Waiting, // 5
|
||||
queueSetDigest.Executing, // 6
|
||||
))
|
||||
endline(tabWriter)
|
||||
}
|
||||
runtime.HandleError(tabWriter.Flush())
|
||||
}
|
||||
|
||||
func (cfgCtl *configController) dumpQueues(w http.ResponseWriter, r *http.Request) {
|
||||
cfgCtl.lock.Lock()
|
||||
defer cfgCtl.lock.Unlock()
|
||||
tabWriter := tabwriter.NewWriter(w, 8, 0, 1, ' ', 0)
|
||||
columnHeaders := []string{
|
||||
"PriorityLevelName", // 1
|
||||
"Index", // 2
|
||||
"PendingRequests", // 3
|
||||
"ExecutingRequests", // 4
|
||||
"VirtualStart", // 5
|
||||
}
|
||||
tabPrint(tabWriter, rowForHeaders(columnHeaders))
|
||||
endline(tabWriter)
|
||||
for _, plState := range cfgCtl.priorityLevelStates {
|
||||
if plState.queues == nil {
|
||||
tabPrint(tabWriter, row(
|
||||
plState.pl.Name, // 1
|
||||
"<none>", // 2
|
||||
"<none>", // 3
|
||||
"<none>", // 4
|
||||
"<none>", // 5
|
||||
))
|
||||
endline(tabWriter)
|
||||
continue
|
||||
}
|
||||
queueSetDigest := plState.queues.Dump(false)
|
||||
for i, q := range queueSetDigest.Queues {
|
||||
tabPrint(tabWriter, rowForQueue(
|
||||
plState.pl.Name, // 1
|
||||
i, // 2
|
||||
len(q.Requests), // 3
|
||||
q.ExecutingRequests, // 4
|
||||
q.VirtualStart, // 5
|
||||
))
|
||||
endline(tabWriter)
|
||||
}
|
||||
}
|
||||
runtime.HandleError(tabWriter.Flush())
|
||||
}
|
||||
|
||||
func (cfgCtl *configController) dumpRequests(w http.ResponseWriter, r *http.Request) {
|
||||
cfgCtl.lock.Lock()
|
||||
defer cfgCtl.lock.Unlock()
|
||||
|
||||
includeRequestDetails := len(r.URL.Query().Get(queryIncludeRequestDetails)) > 0
|
||||
|
||||
tabWriter := tabwriter.NewWriter(w, 8, 0, 1, ' ', 0)
|
||||
tabPrint(tabWriter, rowForHeaders([]string{
|
||||
"PriorityLevelName", // 1
|
||||
"FlowSchemaName", // 2
|
||||
"QueueIndex", // 3
|
||||
"RequestIndexInQueue", // 4
|
||||
"FlowDistingsher", // 5
|
||||
"ArriveTime", // 6
|
||||
}))
|
||||
if includeRequestDetails {
|
||||
tabPrint(tabWriter, rowForHeaders([]string{
|
||||
"UserName", // 7
|
||||
"Verb", // 8
|
||||
"APIPath", // 9
|
||||
"Namespace", // 10
|
||||
"Name", // 11
|
||||
"APIVersion", // 12
|
||||
"Resource", // 13
|
||||
"SubResource", // 14
|
||||
}))
|
||||
}
|
||||
endline(tabWriter)
|
||||
for _, plState := range cfgCtl.priorityLevelStates {
|
||||
if plState.queues == nil {
|
||||
tabPrint(tabWriter, row(
|
||||
plState.pl.Name, // 1
|
||||
"<none>", // 2
|
||||
"<none>", // 3
|
||||
"<none>", // 4
|
||||
"<none>", // 5
|
||||
"<none>", // 6
|
||||
))
|
||||
if includeRequestDetails {
|
||||
tabPrint(tabWriter, row(
|
||||
"<none>", // 7
|
||||
"<none>", // 8
|
||||
"<none>", // 9
|
||||
"<none>", // 10
|
||||
"<none>", // 11
|
||||
"<none>", // 12
|
||||
"<none>", // 13
|
||||
"<none>", // 14
|
||||
))
|
||||
}
|
||||
endline(tabWriter)
|
||||
continue
|
||||
}
|
||||
queueSetDigest := plState.queues.Dump(includeRequestDetails)
|
||||
for iq, q := range queueSetDigest.Queues {
|
||||
for ir, r := range q.Requests {
|
||||
tabPrint(tabWriter, rowForRequest(
|
||||
plState.pl.Name, // 1
|
||||
r.MatchedFlowSchema, // 2
|
||||
iq, // 3
|
||||
ir, // 4
|
||||
r.FlowDistinguisher, // 5
|
||||
r.ArriveTime, // 6
|
||||
))
|
||||
if includeRequestDetails {
|
||||
tabPrint(tabWriter, rowForRequestDetails(
|
||||
r.UserName, // 7
|
||||
r.RequestInfo.Verb, // 8
|
||||
r.RequestInfo.Path, // 9
|
||||
r.RequestInfo.Namespace, // 10
|
||||
r.RequestInfo.Name, // 11
|
||||
schema.GroupVersion{
|
||||
Group: r.RequestInfo.APIGroup,
|
||||
Version: r.RequestInfo.APIVersion,
|
||||
}.String(), // 12
|
||||
r.RequestInfo.Resource, // 13
|
||||
r.RequestInfo.Subresource, // 14
|
||||
))
|
||||
}
|
||||
endline(tabWriter)
|
||||
}
|
||||
}
|
||||
}
|
||||
runtime.HandleError(tabWriter.Flush())
|
||||
}
|
||||
|
||||
func tabPrint(w io.Writer, row string) {
|
||||
_, err := fmt.Fprint(w, row)
|
||||
runtime.HandleError(err)
|
||||
}
|
||||
func endline(w io.Writer) {
|
||||
_, err := fmt.Fprint(w, "\n")
|
||||
runtime.HandleError(err)
|
||||
}
|
||||
|
||||
func rowForHeaders(headers []string) string {
|
||||
return row(headers...)
|
||||
}
|
||||
|
||||
func rowForPriorityLevel(plName string, activeQueues int, isIdle, isQuiescing bool, waitingRequests, executingRequests int) string {
|
||||
return row(
|
||||
plName,
|
||||
strconv.Itoa(activeQueues),
|
||||
strconv.FormatBool(isIdle),
|
||||
strconv.FormatBool(isQuiescing),
|
||||
strconv.Itoa(waitingRequests),
|
||||
strconv.Itoa(executingRequests),
|
||||
)
|
||||
}
|
||||
|
||||
func rowForQueue(plName string, index, waitingRequests, executingRequests int, virtualStart float64) string {
|
||||
return row(
|
||||
plName,
|
||||
strconv.Itoa(index),
|
||||
strconv.Itoa(waitingRequests),
|
||||
strconv.Itoa(executingRequests),
|
||||
fmt.Sprintf("%.4f", virtualStart),
|
||||
)
|
||||
}
|
||||
|
||||
func rowForRequest(plName, fsName string, queueIndex, requestIndex int, flowDistinguisher string, arriveTime time.Time) string {
|
||||
return row(
|
||||
plName,
|
||||
fsName,
|
||||
strconv.Itoa(queueIndex),
|
||||
strconv.Itoa(requestIndex),
|
||||
flowDistinguisher,
|
||||
arriveTime.UTC().Format(time.RFC3339Nano),
|
||||
)
|
||||
}
|
||||
|
||||
func rowForRequestDetails(username, verb, path, namespace, name, apiVersion, resource, subResource string) string {
|
||||
return row(
|
||||
username,
|
||||
verb,
|
||||
path,
|
||||
)
|
||||
}
|
||||
|
||||
func row(columns ...string) string {
|
||||
return strings.Join(columns, ",\t") + ",\t"
|
||||
}
|
|
@ -22,6 +22,7 @@ import (
|
|||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/apiserver/pkg/server/mux"
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
|
||||
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
||||
fqs "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset"
|
||||
|
@ -51,6 +52,9 @@ type Interface interface {
|
|||
// any needed changes to local behavior. This method ceases
|
||||
// activity and returns after the given channel is closed.
|
||||
Run(stopCh <-chan struct{}) error
|
||||
|
||||
// Install installs debugging endpoints to the web-server.
|
||||
Install(c *mux.PathRecorderMux)
|
||||
}
|
||||
|
||||
// This request filter implements https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/20190228-priority-and-fairness.md
|
||||
|
|
|
@ -28,6 +28,7 @@ import (
|
|||
fcv1a1 "k8s.io/api/flowcontrol/v1alpha1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
|
||||
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
||||
fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format"
|
||||
"k8s.io/client-go/informers"
|
||||
|
@ -95,6 +96,10 @@ func (cqs *ctlTestQueueSet) BeginConfigChange(qc fq.QueuingConfig) (fq.QueueSetC
|
|||
return ctlTestQueueSetCompleter{cqs.cts, cqs, qc}, nil
|
||||
}
|
||||
|
||||
func (cqs *ctlTestQueueSet) Dump(bool) debug.QueueSetDump {
|
||||
return debug.QueueSetDump{}
|
||||
}
|
||||
|
||||
func (cqc ctlTestQueueSetCompleter) Complete(dc fq.DispatchingConfig) fq.QueueSet {
|
||||
cqc.cts.lock.Lock()
|
||||
defer cqc.cts.lock.Unlock()
|
||||
|
@ -115,7 +120,7 @@ func (cqs *ctlTestQueueSet) IsIdle() bool {
|
|||
return cqs.countActive == 0
|
||||
}
|
||||
|
||||
func (cqs *ctlTestQueueSet) StartRequest(ctx context.Context, hashValue uint64, fsName string, descr1, descr2 interface{}) (req fq.Request, idle bool) {
|
||||
func (cqs *ctlTestQueueSet) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) (req fq.Request, idle bool) {
|
||||
cqs.cts.lock.Lock()
|
||||
defer cqs.cts.lock.Unlock()
|
||||
cqs.countActive++
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package debug
|
||||
|
||||
import (
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
"time"
|
||||
)
|
||||
|
||||
// QueueSetDump is an instant dump of queue-set.
|
||||
type QueueSetDump struct {
|
||||
Queues []QueueDump
|
||||
Waiting int
|
||||
Executing int
|
||||
}
|
||||
|
||||
// QueueDump is an instant dump of one queue in a queue-set.
|
||||
type QueueDump struct {
|
||||
Requests []RequestDump
|
||||
VirtualStart float64
|
||||
ExecutingRequests int
|
||||
}
|
||||
|
||||
// RequestDump is an instant dump of one requests pending in the queue.
|
||||
type RequestDump struct {
|
||||
MatchedFlowSchema string
|
||||
FlowDistinguisher string
|
||||
ArriveTime time.Time
|
||||
StartTime time.Time
|
||||
// request details
|
||||
UserName string
|
||||
RequestInfo request.RequestInfo
|
||||
}
|
|
@ -19,6 +19,8 @@ package fairqueuing
|
|||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
|
||||
)
|
||||
|
||||
// QueueSetFactory is used to create QueueSet objects. Creation, like
|
||||
|
@ -77,7 +79,13 @@ type QueueSet interface {
|
|||
// was idle at the moment of the return. Otherwise idle==false
|
||||
// and the client must call the Finish method of the Request
|
||||
// exactly once.
|
||||
StartRequest(ctx context.Context, hashValue uint64, fsName string, descr1, descr2 interface{}) (req Request, idle bool)
|
||||
StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) (req Request, idle bool)
|
||||
|
||||
// Dump saves and returns the instant internal state of the queue-set.
|
||||
// Note that dumping process will stop the queue-set from proceeding
|
||||
// any requests.
|
||||
// For debugging only.
|
||||
Dump(includeRequestDetails bool) debug.QueueSetDump
|
||||
}
|
||||
|
||||
// Request represents the remainder of the handling of one request
|
||||
|
|
|
@ -28,6 +28,7 @@ import (
|
|||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
|
||||
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise"
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
||||
|
@ -221,7 +222,7 @@ const (
|
|||
// executing at each point where there is a change in that quantity,
|
||||
// because the metrics --- and only the metrics --- track that
|
||||
// quantity per FlowSchema.
|
||||
func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, fsName string, descr1, descr2 interface{}) (fq.Request, bool) {
|
||||
func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) (fq.Request, bool) {
|
||||
qs.lockAndSyncTime()
|
||||
defer qs.lock.Unlock()
|
||||
var req *request
|
||||
|
@ -235,7 +236,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, fsName s
|
|||
metrics.AddReject(qs.qCfg.Name, fsName, "concurrency-limit")
|
||||
return nil, qs.isIdleLocked()
|
||||
}
|
||||
req = qs.dispatchSansQueueLocked(ctx, fsName, descr1, descr2)
|
||||
req = qs.dispatchSansQueueLocked(ctx, flowDistinguisher, fsName, descr1, descr2)
|
||||
return req, false
|
||||
}
|
||||
|
||||
|
@ -246,7 +247,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, fsName s
|
|||
// 3) Reject current request if there is not enough concurrency shares and
|
||||
// we are at max queue length
|
||||
// 4) If not rejected, create a request and enqueue
|
||||
req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, hashValue, fsName, descr1, descr2)
|
||||
req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, hashValue, flowDistinguisher, fsName, descr1, descr2)
|
||||
// req == nil means that the request was rejected - no remaining
|
||||
// concurrency shares and at max queue length already
|
||||
if req == nil {
|
||||
|
@ -398,7 +399,7 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 {
|
|||
// returns the enqueud request on a successful enqueue
|
||||
// returns nil in the case that there is no available concurrency or
|
||||
// the queuelengthlimit has been reached
|
||||
func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, hashValue uint64, fsName string, descr1, descr2 interface{}) *request {
|
||||
func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request {
|
||||
// Start with the shuffle sharding, to pick a queue.
|
||||
queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2)
|
||||
queue := qs.queues[queueIdx]
|
||||
|
@ -410,14 +411,15 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
|
|||
|
||||
// Create a request and enqueue
|
||||
req := &request{
|
||||
qs: qs,
|
||||
fsName: fsName,
|
||||
ctx: ctx,
|
||||
decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter),
|
||||
arrivalTime: qs.clock.Now(),
|
||||
queue: queue,
|
||||
descr1: descr1,
|
||||
descr2: descr2,
|
||||
qs: qs,
|
||||
fsName: fsName,
|
||||
flowDistinguisher: flowDistinguisher,
|
||||
ctx: ctx,
|
||||
decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter),
|
||||
arrivalTime: qs.clock.Now(),
|
||||
queue: queue,
|
||||
descr1: descr1,
|
||||
descr2: descr2,
|
||||
}
|
||||
if ok := qs.rejectOrEnqueueLocked(req); !ok {
|
||||
return nil
|
||||
|
@ -523,17 +525,18 @@ func (qs *queueSet) dispatchAsMuchAsPossibleLocked() {
|
|||
}
|
||||
}
|
||||
|
||||
func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, fsName string, descr1, descr2 interface{}) *request {
|
||||
func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request {
|
||||
now := qs.clock.Now()
|
||||
req := &request{
|
||||
qs: qs,
|
||||
fsName: fsName,
|
||||
ctx: ctx,
|
||||
startTime: now,
|
||||
decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter),
|
||||
arrivalTime: now,
|
||||
descr1: descr1,
|
||||
descr2: descr2,
|
||||
qs: qs,
|
||||
fsName: fsName,
|
||||
flowDistinguisher: flowDistinguisher,
|
||||
ctx: ctx,
|
||||
startTime: now,
|
||||
decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter),
|
||||
arrivalTime: now,
|
||||
descr1: descr1,
|
||||
descr2: descr2,
|
||||
}
|
||||
req.decision.SetLocked(decisionExecute)
|
||||
qs.totRequestsExecuting++
|
||||
|
@ -709,3 +712,17 @@ func (qs *queueSet) preCreateOrUnblockGoroutine() {
|
|||
func (qs *queueSet) goroutineDoneOrBlocked() {
|
||||
qs.counter.Add(-1)
|
||||
}
|
||||
|
||||
func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump {
|
||||
qs.lock.Lock()
|
||||
defer qs.lock.Unlock()
|
||||
d := debug.QueueSetDump{
|
||||
Queues: make([]debug.QueueDump, len(qs.queues)),
|
||||
Waiting: qs.totRequestsWaiting,
|
||||
Executing: qs.totRequestsExecuting,
|
||||
}
|
||||
for i, q := range qs.queues {
|
||||
d.Queues[i] = q.dump(includeRequestDetails)
|
||||
}
|
||||
return d
|
||||
}
|
||||
|
|
|
@ -79,7 +79,7 @@ func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet,
|
|||
go func(i, j int, uc uniformClient, igr test.Integrator) {
|
||||
for k := 0; k < uc.nCalls; k++ {
|
||||
ClockWait(clk, counter, uc.thinkDuration)
|
||||
req, idle := qs.StartRequest(context.Background(), uc.hash, fsName, name, []int{i, j, k})
|
||||
req, idle := qs.StartRequest(context.Background(), uc.hash, "", fsName, name, []int{i, j, k})
|
||||
t.Logf("%s: %d, %d, %d got req=%p, idle=%v", clk.Now().Format(nsTimeFmt), i, j, k, req, idle)
|
||||
if req == nil {
|
||||
atomic.AddUint64(&failedCount, 1)
|
||||
|
@ -346,7 +346,7 @@ func TestContextCancel(t *testing.T) {
|
|||
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1})
|
||||
counter.Add(1) // account for the goroutine running this test
|
||||
ctx1 := context.Background()
|
||||
req1, _ := qs.StartRequest(ctx1, 1, "fs1", "test", "one")
|
||||
req1, _ := qs.StartRequest(ctx1, 1, "", "fs1", "test", "one")
|
||||
if req1 == nil {
|
||||
t.Error("Request rejected")
|
||||
return
|
||||
|
@ -362,7 +362,7 @@ func TestContextCancel(t *testing.T) {
|
|||
counter.Add(1)
|
||||
cancel2()
|
||||
}()
|
||||
req2, idle2a := qs.StartRequest(ctx2, 2, "fs2", "test", "two")
|
||||
req2, idle2a := qs.StartRequest(ctx2, 2, "", "fs2", "test", "two")
|
||||
if idle2a {
|
||||
t.Error("2nd StartRequest returned idle")
|
||||
}
|
||||
|
|
|
@ -20,15 +20,20 @@ import (
|
|||
"context"
|
||||
"time"
|
||||
|
||||
genericrequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise"
|
||||
)
|
||||
|
||||
// request is a temporary container for "requests" with additional
|
||||
// tracking fields required for the functionality FQScheduler
|
||||
type request struct {
|
||||
qs *queueSet
|
||||
fsName string
|
||||
ctx context.Context
|
||||
ctx context.Context
|
||||
|
||||
qs *queueSet
|
||||
|
||||
flowDistinguisher string
|
||||
fsName string
|
||||
|
||||
// The relevant queue. Is nil if this request did not go through
|
||||
// a queue.
|
||||
|
@ -94,3 +99,27 @@ func (q *queue) GetVirtualFinish(J int, G float64) float64 {
|
|||
jg := float64(J+1) * float64(G)
|
||||
return jg + q.virtualStart
|
||||
}
|
||||
|
||||
func (q *queue) dump(includeDetails bool) debug.QueueDump {
|
||||
digest := make([]debug.RequestDump, len(q.requests))
|
||||
for i, r := range q.requests {
|
||||
// dump requests.
|
||||
digest[i].MatchedFlowSchema = r.fsName
|
||||
digest[i].FlowDistinguisher = r.flowDistinguisher
|
||||
digest[i].ArriveTime = r.arrivalTime
|
||||
digest[i].StartTime = r.startTime
|
||||
if includeDetails {
|
||||
userInfo, _ := genericrequest.UserFrom(r.ctx)
|
||||
digest[i].UserName = userInfo.GetName()
|
||||
requestInfo, ok := genericrequest.RequestInfoFrom(r.ctx)
|
||||
if ok {
|
||||
digest[i].RequestInfo = *requestInfo
|
||||
}
|
||||
}
|
||||
}
|
||||
return debug.QueueDump{
|
||||
VirtualStart: q.virtualStart,
|
||||
Requests: digest,
|
||||
ExecutingRequests: q.requestsExecuting,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ package testing
|
|||
import (
|
||||
"context"
|
||||
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
|
||||
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
||||
)
|
||||
|
||||
|
@ -53,10 +54,14 @@ func (noRestraint) IsIdle() bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (noRestraint) StartRequest(ctx context.Context, hashValue uint64, fsName string, descr1, descr2 interface{}) (fq.Request, bool) {
|
||||
func (noRestraint) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) (fq.Request, bool) {
|
||||
return noRestraintRequest{}, false
|
||||
}
|
||||
|
||||
func (noRestraint) Dump(bool) debug.QueueSetDump {
|
||||
return debug.QueueSetDump{}
|
||||
}
|
||||
|
||||
func (noRestraintRequest) Finish(execute func()) (idle bool) {
|
||||
execute()
|
||||
return false
|
||||
|
|
Loading…
Reference in New Issue