diff --git a/pkg/server/config.go b/pkg/server/config.go index ed680f8d5..c744cef1b 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -53,6 +53,7 @@ import ( genericapifilters "k8s.io/apiserver/pkg/endpoints/filters" apiopenapi "k8s.io/apiserver/pkg/endpoints/openapi" apirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/features" genericregistry "k8s.io/apiserver/pkg/registry/generic" "k8s.io/apiserver/pkg/server/dynamiccertificates" "k8s.io/apiserver/pkg/server/egressselector" @@ -60,6 +61,7 @@ import ( "k8s.io/apiserver/pkg/server/healthz" "k8s.io/apiserver/pkg/server/routes" serverstore "k8s.io/apiserver/pkg/server/storage" + "k8s.io/apiserver/pkg/util/feature" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" "k8s.io/client-go/informers" restclient "k8s.io/client-go/rest" @@ -709,6 +711,9 @@ func installAPI(s *GenericAPIServer, c *Config) { if c.EnableDiscovery { s.Handler.GoRestfulContainer.Add(s.DiscoveryGroupManager.WebService()) } + if feature.DefaultFeatureGate.Enabled(features.APIPriorityAndFairness) { + c.FlowControl.Install(s.Handler.NonGoRestfulMux) + } } func NewRequestInfoResolver(c *Config) *apirequest.RequestInfoFactory { diff --git a/pkg/util/flowcontrol/apf_controller.go b/pkg/util/flowcontrol/apf_controller.go index e1c446576..577ae2eb1 100644 --- a/pkg/util/flowcontrol/apf_controller.go +++ b/pkg/util/flowcontrol/apf_controller.go @@ -641,14 +641,15 @@ func (cfgCtl *configController) startRequest(ctx context.Context, rd RequestDige numQueues = plState.pl.Spec.Limited.LimitResponse.Queuing.Queues } + var flowDistinguisher string var hashValue uint64 if numQueues > 1 { - flowDistinguisher := computeFlowDistinguisher(rd, fs.Spec.DistinguisherMethod) + flowDistinguisher = computeFlowDistinguisher(rd, fs.Spec.DistinguisherMethod) hashValue = hashFlowID(fs.Name, flowDistinguisher) } startWaitingTime = time.Now() klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, fs.Name, fs.Spec.DistinguisherMethod, plName, numQueues) - req, idle := plState.queues.StartRequest(ctx, hashValue, fs.Name, rd.RequestInfo, rd.User) + req, idle := plState.queues.StartRequest(ctx, hashValue, flowDistinguisher, fs.Name, rd.RequestInfo, rd.User) if idle { cfgCtl.maybeReapLocked(plName, plState) } diff --git a/pkg/util/flowcontrol/apf_controller_debug.go b/pkg/util/flowcontrol/apf_controller_debug.go new file mode 100644 index 000000000..4a467b6d9 --- /dev/null +++ b/pkg/util/flowcontrol/apf_controller_debug.go @@ -0,0 +1,277 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flowcontrol + +import ( + "fmt" + "io" + "net/http" + "strconv" + "strings" + "text/tabwriter" + "time" + + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apiserver/pkg/server/mux" +) + +const ( + queryIncludeRequestDetails = "includeRequestDetails" +) + +func (cfgCtl *configController) Install(c *mux.PathRecorderMux) { + // TODO(yue9944882): handle "Accept" header properly + // debugging dumps a CSV content for three levels of granularity + // 1. row per priority-level + c.UnlistedHandleFunc("/debug/api_priority_and_fairness/dump_priority_levels", cfgCtl.dumpPriorityLevels) + // 2. row per queue + c.UnlistedHandleFunc("/debug/api_priority_and_fairness/dump_queues", cfgCtl.dumpQueues) + // 3. row per request + c.UnlistedHandleFunc("/debug/api_priority_and_fairness/dump_requests", cfgCtl.dumpRequests) +} + +func (cfgCtl *configController) dumpPriorityLevels(w http.ResponseWriter, r *http.Request) { + cfgCtl.lock.Lock() + defer cfgCtl.lock.Unlock() + tabWriter := tabwriter.NewWriter(w, 8, 0, 1, ' ', 0) + columnHeaders := []string{ + "PriorityLevelName", // 1 + "ActiveQueues", // 2 + "IsIdle", // 3 + "IsQuiescing", // 4 + "WaitingRequests", // 5 + "ExecutingRequests", // 6 + } + tabPrint(tabWriter, rowForHeaders(columnHeaders)) + endline(tabWriter) + for _, plState := range cfgCtl.priorityLevelStates { + if plState.queues == nil { + tabPrint(tabWriter, row( + plState.pl.Name, // 1 + "", // 2 + "", // 3 + "", // 4 + "", // 5 + "", // 6 + )) + endline(tabWriter) + continue + } + queueSetDigest := plState.queues.Dump(false) + activeQueueNum := 0 + for _, q := range queueSetDigest.Queues { + if len(q.Requests) > 0 { + activeQueueNum++ + } + } + + tabPrint(tabWriter, rowForPriorityLevel( + plState.pl.Name, // 1 + activeQueueNum, // 2 + plState.queues.IsIdle(), // 3 + plState.quiescing, // 4 + queueSetDigest.Waiting, // 5 + queueSetDigest.Executing, // 6 + )) + endline(tabWriter) + } + runtime.HandleError(tabWriter.Flush()) +} + +func (cfgCtl *configController) dumpQueues(w http.ResponseWriter, r *http.Request) { + cfgCtl.lock.Lock() + defer cfgCtl.lock.Unlock() + tabWriter := tabwriter.NewWriter(w, 8, 0, 1, ' ', 0) + columnHeaders := []string{ + "PriorityLevelName", // 1 + "Index", // 2 + "PendingRequests", // 3 + "ExecutingRequests", // 4 + "VirtualStart", // 5 + } + tabPrint(tabWriter, rowForHeaders(columnHeaders)) + endline(tabWriter) + for _, plState := range cfgCtl.priorityLevelStates { + if plState.queues == nil { + tabPrint(tabWriter, row( + plState.pl.Name, // 1 + "", // 2 + "", // 3 + "", // 4 + "", // 5 + )) + endline(tabWriter) + continue + } + queueSetDigest := plState.queues.Dump(false) + for i, q := range queueSetDigest.Queues { + tabPrint(tabWriter, rowForQueue( + plState.pl.Name, // 1 + i, // 2 + len(q.Requests), // 3 + q.ExecutingRequests, // 4 + q.VirtualStart, // 5 + )) + endline(tabWriter) + } + } + runtime.HandleError(tabWriter.Flush()) +} + +func (cfgCtl *configController) dumpRequests(w http.ResponseWriter, r *http.Request) { + cfgCtl.lock.Lock() + defer cfgCtl.lock.Unlock() + + includeRequestDetails := len(r.URL.Query().Get(queryIncludeRequestDetails)) > 0 + + tabWriter := tabwriter.NewWriter(w, 8, 0, 1, ' ', 0) + tabPrint(tabWriter, rowForHeaders([]string{ + "PriorityLevelName", // 1 + "FlowSchemaName", // 2 + "QueueIndex", // 3 + "RequestIndexInQueue", // 4 + "FlowDistingsher", // 5 + "ArriveTime", // 6 + })) + if includeRequestDetails { + tabPrint(tabWriter, rowForHeaders([]string{ + "UserName", // 7 + "Verb", // 8 + "APIPath", // 9 + "Namespace", // 10 + "Name", // 11 + "APIVersion", // 12 + "Resource", // 13 + "SubResource", // 14 + })) + } + endline(tabWriter) + for _, plState := range cfgCtl.priorityLevelStates { + if plState.queues == nil { + tabPrint(tabWriter, row( + plState.pl.Name, // 1 + "", // 2 + "", // 3 + "", // 4 + "", // 5 + "", // 6 + )) + if includeRequestDetails { + tabPrint(tabWriter, row( + "", // 7 + "", // 8 + "", // 9 + "", // 10 + "", // 11 + "", // 12 + "", // 13 + "", // 14 + )) + } + endline(tabWriter) + continue + } + queueSetDigest := plState.queues.Dump(includeRequestDetails) + for iq, q := range queueSetDigest.Queues { + for ir, r := range q.Requests { + tabPrint(tabWriter, rowForRequest( + plState.pl.Name, // 1 + r.MatchedFlowSchema, // 2 + iq, // 3 + ir, // 4 + r.FlowDistinguisher, // 5 + r.ArriveTime, // 6 + )) + if includeRequestDetails { + tabPrint(tabWriter, rowForRequestDetails( + r.UserName, // 7 + r.RequestInfo.Verb, // 8 + r.RequestInfo.Path, // 9 + r.RequestInfo.Namespace, // 10 + r.RequestInfo.Name, // 11 + schema.GroupVersion{ + Group: r.RequestInfo.APIGroup, + Version: r.RequestInfo.APIVersion, + }.String(), // 12 + r.RequestInfo.Resource, // 13 + r.RequestInfo.Subresource, // 14 + )) + } + endline(tabWriter) + } + } + } + runtime.HandleError(tabWriter.Flush()) +} + +func tabPrint(w io.Writer, row string) { + _, err := fmt.Fprint(w, row) + runtime.HandleError(err) +} +func endline(w io.Writer) { + _, err := fmt.Fprint(w, "\n") + runtime.HandleError(err) +} + +func rowForHeaders(headers []string) string { + return row(headers...) +} + +func rowForPriorityLevel(plName string, activeQueues int, isIdle, isQuiescing bool, waitingRequests, executingRequests int) string { + return row( + plName, + strconv.Itoa(activeQueues), + strconv.FormatBool(isIdle), + strconv.FormatBool(isQuiescing), + strconv.Itoa(waitingRequests), + strconv.Itoa(executingRequests), + ) +} + +func rowForQueue(plName string, index, waitingRequests, executingRequests int, virtualStart float64) string { + return row( + plName, + strconv.Itoa(index), + strconv.Itoa(waitingRequests), + strconv.Itoa(executingRequests), + fmt.Sprintf("%.4f", virtualStart), + ) +} + +func rowForRequest(plName, fsName string, queueIndex, requestIndex int, flowDistinguisher string, arriveTime time.Time) string { + return row( + plName, + fsName, + strconv.Itoa(queueIndex), + strconv.Itoa(requestIndex), + flowDistinguisher, + arriveTime.UTC().Format(time.RFC3339Nano), + ) +} + +func rowForRequestDetails(username, verb, path, namespace, name, apiVersion, resource, subResource string) string { + return row( + username, + verb, + path, + ) +} + +func row(columns ...string) string { + return strings.Join(columns, ",\t") + ",\t" +} diff --git a/pkg/util/flowcontrol/apf_filter.go b/pkg/util/flowcontrol/apf_filter.go index 79bc9faad..0ee68f234 100644 --- a/pkg/util/flowcontrol/apf_filter.go +++ b/pkg/util/flowcontrol/apf_filter.go @@ -22,6 +22,7 @@ import ( "time" "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apiserver/pkg/server/mux" "k8s.io/apiserver/pkg/util/flowcontrol/counter" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" fqs "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset" @@ -51,6 +52,9 @@ type Interface interface { // any needed changes to local behavior. This method ceases // activity and returns after the given channel is closed. Run(stopCh <-chan struct{}) error + + // Install installs debugging endpoints to the web-server. + Install(c *mux.PathRecorderMux) } // This request filter implements https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/20190228-priority-and-fairness.md diff --git a/pkg/util/flowcontrol/controller_test.go b/pkg/util/flowcontrol/controller_test.go index 84a5d72c5..faa8c7408 100644 --- a/pkg/util/flowcontrol/controller_test.go +++ b/pkg/util/flowcontrol/controller_test.go @@ -28,6 +28,7 @@ import ( fcv1a1 "k8s.io/api/flowcontrol/v1alpha1" "k8s.io/apimachinery/pkg/util/sets" fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap" + "k8s.io/apiserver/pkg/util/flowcontrol/debug" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format" "k8s.io/client-go/informers" @@ -95,6 +96,10 @@ func (cqs *ctlTestQueueSet) BeginConfigChange(qc fq.QueuingConfig) (fq.QueueSetC return ctlTestQueueSetCompleter{cqs.cts, cqs, qc}, nil } +func (cqs *ctlTestQueueSet) Dump(bool) debug.QueueSetDump { + return debug.QueueSetDump{} +} + func (cqc ctlTestQueueSetCompleter) Complete(dc fq.DispatchingConfig) fq.QueueSet { cqc.cts.lock.Lock() defer cqc.cts.lock.Unlock() @@ -115,7 +120,7 @@ func (cqs *ctlTestQueueSet) IsIdle() bool { return cqs.countActive == 0 } -func (cqs *ctlTestQueueSet) StartRequest(ctx context.Context, hashValue uint64, fsName string, descr1, descr2 interface{}) (req fq.Request, idle bool) { +func (cqs *ctlTestQueueSet) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) (req fq.Request, idle bool) { cqs.cts.lock.Lock() defer cqs.cts.lock.Unlock() cqs.countActive++ diff --git a/pkg/util/flowcontrol/debug/dump.go b/pkg/util/flowcontrol/debug/dump.go new file mode 100644 index 000000000..d668d9fe7 --- /dev/null +++ b/pkg/util/flowcontrol/debug/dump.go @@ -0,0 +1,47 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package debug + +import ( + "k8s.io/apiserver/pkg/endpoints/request" + "time" +) + +// QueueSetDump is an instant dump of queue-set. +type QueueSetDump struct { + Queues []QueueDump + Waiting int + Executing int +} + +// QueueDump is an instant dump of one queue in a queue-set. +type QueueDump struct { + Requests []RequestDump + VirtualStart float64 + ExecutingRequests int +} + +// RequestDump is an instant dump of one requests pending in the queue. +type RequestDump struct { + MatchedFlowSchema string + FlowDistinguisher string + ArriveTime time.Time + StartTime time.Time + // request details + UserName string + RequestInfo request.RequestInfo +} diff --git a/pkg/util/flowcontrol/fairqueuing/interface.go b/pkg/util/flowcontrol/fairqueuing/interface.go index 5e573bf8f..3ac03f787 100644 --- a/pkg/util/flowcontrol/fairqueuing/interface.go +++ b/pkg/util/flowcontrol/fairqueuing/interface.go @@ -19,6 +19,8 @@ package fairqueuing import ( "context" "time" + + "k8s.io/apiserver/pkg/util/flowcontrol/debug" ) // QueueSetFactory is used to create QueueSet objects. Creation, like @@ -77,7 +79,13 @@ type QueueSet interface { // was idle at the moment of the return. Otherwise idle==false // and the client must call the Finish method of the Request // exactly once. - StartRequest(ctx context.Context, hashValue uint64, fsName string, descr1, descr2 interface{}) (req Request, idle bool) + StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) (req Request, idle bool) + + // Dump saves and returns the instant internal state of the queue-set. + // Note that dumping process will stop the queue-set from proceeding + // any requests. + // For debugging only. + Dump(includeRequestDetails bool) debug.QueueSetDump } // Request represents the remainder of the handling of one request diff --git a/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 57ce829bc..316ca3479 100644 --- a/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apiserver/pkg/util/flowcontrol/counter" + "k8s.io/apiserver/pkg/util/flowcontrol/debug" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise" "k8s.io/apiserver/pkg/util/flowcontrol/metrics" @@ -221,7 +222,7 @@ const ( // executing at each point where there is a change in that quantity, // because the metrics --- and only the metrics --- track that // quantity per FlowSchema. -func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, fsName string, descr1, descr2 interface{}) (fq.Request, bool) { +func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) (fq.Request, bool) { qs.lockAndSyncTime() defer qs.lock.Unlock() var req *request @@ -235,7 +236,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, fsName s metrics.AddReject(qs.qCfg.Name, fsName, "concurrency-limit") return nil, qs.isIdleLocked() } - req = qs.dispatchSansQueueLocked(ctx, fsName, descr1, descr2) + req = qs.dispatchSansQueueLocked(ctx, flowDistinguisher, fsName, descr1, descr2) return req, false } @@ -246,7 +247,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, fsName s // 3) Reject current request if there is not enough concurrency shares and // we are at max queue length // 4) If not rejected, create a request and enqueue - req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, hashValue, fsName, descr1, descr2) + req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, hashValue, flowDistinguisher, fsName, descr1, descr2) // req == nil means that the request was rejected - no remaining // concurrency shares and at max queue length already if req == nil { @@ -398,7 +399,7 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 { // returns the enqueud request on a successful enqueue // returns nil in the case that there is no available concurrency or // the queuelengthlimit has been reached -func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, hashValue uint64, fsName string, descr1, descr2 interface{}) *request { +func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request { // Start with the shuffle sharding, to pick a queue. queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2) queue := qs.queues[queueIdx] @@ -410,14 +411,15 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte // Create a request and enqueue req := &request{ - qs: qs, - fsName: fsName, - ctx: ctx, - decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter), - arrivalTime: qs.clock.Now(), - queue: queue, - descr1: descr1, - descr2: descr2, + qs: qs, + fsName: fsName, + flowDistinguisher: flowDistinguisher, + ctx: ctx, + decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter), + arrivalTime: qs.clock.Now(), + queue: queue, + descr1: descr1, + descr2: descr2, } if ok := qs.rejectOrEnqueueLocked(req); !ok { return nil @@ -523,17 +525,18 @@ func (qs *queueSet) dispatchAsMuchAsPossibleLocked() { } } -func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, fsName string, descr1, descr2 interface{}) *request { +func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request { now := qs.clock.Now() req := &request{ - qs: qs, - fsName: fsName, - ctx: ctx, - startTime: now, - decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter), - arrivalTime: now, - descr1: descr1, - descr2: descr2, + qs: qs, + fsName: fsName, + flowDistinguisher: flowDistinguisher, + ctx: ctx, + startTime: now, + decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter), + arrivalTime: now, + descr1: descr1, + descr2: descr2, } req.decision.SetLocked(decisionExecute) qs.totRequestsExecuting++ @@ -709,3 +712,17 @@ func (qs *queueSet) preCreateOrUnblockGoroutine() { func (qs *queueSet) goroutineDoneOrBlocked() { qs.counter.Add(-1) } + +func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump { + qs.lock.Lock() + defer qs.lock.Unlock() + d := debug.QueueSetDump{ + Queues: make([]debug.QueueDump, len(qs.queues)), + Waiting: qs.totRequestsWaiting, + Executing: qs.totRequestsExecuting, + } + for i, q := range qs.queues { + d.Queues[i] = q.dump(includeRequestDetails) + } + return d +} diff --git a/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index 50bc2f7b4..e9c700efc 100644 --- a/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -79,7 +79,7 @@ func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet, go func(i, j int, uc uniformClient, igr test.Integrator) { for k := 0; k < uc.nCalls; k++ { ClockWait(clk, counter, uc.thinkDuration) - req, idle := qs.StartRequest(context.Background(), uc.hash, fsName, name, []int{i, j, k}) + req, idle := qs.StartRequest(context.Background(), uc.hash, "", fsName, name, []int{i, j, k}) t.Logf("%s: %d, %d, %d got req=%p, idle=%v", clk.Now().Format(nsTimeFmt), i, j, k, req, idle) if req == nil { atomic.AddUint64(&failedCount, 1) @@ -346,7 +346,7 @@ func TestContextCancel(t *testing.T) { qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1}) counter.Add(1) // account for the goroutine running this test ctx1 := context.Background() - req1, _ := qs.StartRequest(ctx1, 1, "fs1", "test", "one") + req1, _ := qs.StartRequest(ctx1, 1, "", "fs1", "test", "one") if req1 == nil { t.Error("Request rejected") return @@ -362,7 +362,7 @@ func TestContextCancel(t *testing.T) { counter.Add(1) cancel2() }() - req2, idle2a := qs.StartRequest(ctx2, 2, "fs2", "test", "two") + req2, idle2a := qs.StartRequest(ctx2, 2, "", "fs2", "test", "two") if idle2a { t.Error("2nd StartRequest returned idle") } diff --git a/pkg/util/flowcontrol/fairqueuing/queueset/types.go b/pkg/util/flowcontrol/fairqueuing/queueset/types.go index 1facc701d..1bcb8cfb3 100644 --- a/pkg/util/flowcontrol/fairqueuing/queueset/types.go +++ b/pkg/util/flowcontrol/fairqueuing/queueset/types.go @@ -20,15 +20,20 @@ import ( "context" "time" + genericrequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/util/flowcontrol/debug" "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise" ) // request is a temporary container for "requests" with additional // tracking fields required for the functionality FQScheduler type request struct { - qs *queueSet - fsName string - ctx context.Context + ctx context.Context + + qs *queueSet + + flowDistinguisher string + fsName string // The relevant queue. Is nil if this request did not go through // a queue. @@ -94,3 +99,27 @@ func (q *queue) GetVirtualFinish(J int, G float64) float64 { jg := float64(J+1) * float64(G) return jg + q.virtualStart } + +func (q *queue) dump(includeDetails bool) debug.QueueDump { + digest := make([]debug.RequestDump, len(q.requests)) + for i, r := range q.requests { + // dump requests. + digest[i].MatchedFlowSchema = r.fsName + digest[i].FlowDistinguisher = r.flowDistinguisher + digest[i].ArriveTime = r.arrivalTime + digest[i].StartTime = r.startTime + if includeDetails { + userInfo, _ := genericrequest.UserFrom(r.ctx) + digest[i].UserName = userInfo.GetName() + requestInfo, ok := genericrequest.RequestInfoFrom(r.ctx) + if ok { + digest[i].RequestInfo = *requestInfo + } + } + } + return debug.QueueDump{ + VirtualStart: q.virtualStart, + Requests: digest, + ExecutingRequests: q.requestsExecuting, + } +} diff --git a/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go b/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go index 14504f201..72e7f5706 100644 --- a/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go +++ b/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go @@ -19,6 +19,7 @@ package testing import ( "context" + "k8s.io/apiserver/pkg/util/flowcontrol/debug" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" ) @@ -53,10 +54,14 @@ func (noRestraint) IsIdle() bool { return false } -func (noRestraint) StartRequest(ctx context.Context, hashValue uint64, fsName string, descr1, descr2 interface{}) (fq.Request, bool) { +func (noRestraint) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) (fq.Request, bool) { return noRestraintRequest{}, false } +func (noRestraint) Dump(bool) debug.QueueSetDump { + return debug.QueueSetDump{} +} + func (noRestraintRequest) Finish(execute func()) (idle bool) { execute() return false