Fix APF metric denominator problems
Co-authored-by: JUN YANG <yang.jun22@zte.com.cn> Kubernetes-commit: fdd921cad0cd9308ec62c1b86c9c1cc5d12e5d21
This commit is contained in:
		
							parent
							
								
									57f72cea4a
								
							
						
					
					
						commit
						eb15930b31
					
				| 
						 | 
					@ -41,10 +41,6 @@ const (
 | 
				
			||||||
	// the metrics tracks maximal value over period making this
 | 
						// the metrics tracks maximal value over period making this
 | 
				
			||||||
	// longer will increase the metric value.
 | 
						// longer will increase the metric value.
 | 
				
			||||||
	inflightUsageMetricUpdatePeriod = time.Second
 | 
						inflightUsageMetricUpdatePeriod = time.Second
 | 
				
			||||||
 | 
					 | 
				
			||||||
	// How often to run maintenance on observations to ensure
 | 
					 | 
				
			||||||
	// that they do not fall too far behind.
 | 
					 | 
				
			||||||
	observationMaintenancePeriod = 10 * time.Second
 | 
					 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
var (
 | 
					var (
 | 
				
			||||||
| 
						 | 
					@ -91,8 +87,6 @@ func (w *requestWatermark) recordReadOnly(readOnlyVal int) {
 | 
				
			||||||
// watermark tracks requests being executed (not waiting in a queue)
 | 
					// watermark tracks requests being executed (not waiting in a queue)
 | 
				
			||||||
var watermark = &requestWatermark{
 | 
					var watermark = &requestWatermark{
 | 
				
			||||||
	phase: metrics.ExecutingPhase,
 | 
						phase: metrics.ExecutingPhase,
 | 
				
			||||||
	readOnlyObserver: fcmetrics.ReadWriteConcurrencyGaugeVec.NewForLabelValuesSafe(0, 1, []string{fcmetrics.LabelValueExecuting, metrics.ReadOnlyKind}),
 | 
					 | 
				
			||||||
	mutatingObserver: fcmetrics.ReadWriteConcurrencyGaugeVec.NewForLabelValuesSafe(0, 1, []string{fcmetrics.LabelValueExecuting, metrics.MutatingKind}),
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// startWatermarkMaintenance starts the goroutines to observe and maintain the specified watermark.
 | 
					// startWatermarkMaintenance starts the goroutines to observe and maintain the specified watermark.
 | 
				
			||||||
| 
						 | 
					@ -108,14 +102,25 @@ func startWatermarkMaintenance(watermark *requestWatermark, stopCh <-chan struct
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		metrics.UpdateInflightRequestMetrics(watermark.phase, readOnlyWatermark, mutatingWatermark)
 | 
							metrics.UpdateInflightRequestMetrics(watermark.phase, readOnlyWatermark, mutatingWatermark)
 | 
				
			||||||
	}, inflightUsageMetricUpdatePeriod, stopCh)
 | 
						}, inflightUsageMetricUpdatePeriod, stopCh)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Periodically observe the watermarks. This is done to ensure that they do not fall too far behind. When they do
 | 
					var initMaxInFlightOnce sync.Once
 | 
				
			||||||
	// fall too far behind, then there is a long delay in responding to the next request received while the observer
 | 
					
 | 
				
			||||||
	// catches back up.
 | 
					func initMaxInFlight(nonMutatingLimit, mutatingLimit int) {
 | 
				
			||||||
	go wait.Until(func() {
 | 
						initMaxInFlightOnce.Do(func() {
 | 
				
			||||||
		watermark.readOnlyObserver.Add(0)
 | 
							// Fetching these gauges is delayed until after their underlying metric has been registered
 | 
				
			||||||
		watermark.mutatingObserver.Add(0)
 | 
							// so that this latches onto the efficient implementation.
 | 
				
			||||||
	}, observationMaintenancePeriod, stopCh)
 | 
							watermark.readOnlyObserver = fcmetrics.GetExecutingReadonlyConcurrency()
 | 
				
			||||||
 | 
							watermark.mutatingObserver = fcmetrics.GetExecutingMutatingConcurrency()
 | 
				
			||||||
 | 
							if nonMutatingLimit != 0 {
 | 
				
			||||||
 | 
								watermark.readOnlyObserver.SetDenominator(float64(nonMutatingLimit))
 | 
				
			||||||
 | 
								klog.V(2).InfoS("Set denominator for readonly requests", "limit", nonMutatingLimit)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if mutatingLimit != 0 {
 | 
				
			||||||
 | 
								watermark.mutatingObserver.SetDenominator(float64(mutatingLimit))
 | 
				
			||||||
 | 
								klog.V(2).InfoS("Set denominator for mutating requests", "limit", mutatingLimit)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// WithMaxInFlightLimit limits the number of in-flight requests to buffer size of the passed in channel.
 | 
					// WithMaxInFlightLimit limits the number of in-flight requests to buffer size of the passed in channel.
 | 
				
			||||||
| 
						 | 
					@ -132,12 +137,17 @@ func WithMaxInFlightLimit(
 | 
				
			||||||
	var mutatingChan chan bool
 | 
						var mutatingChan chan bool
 | 
				
			||||||
	if nonMutatingLimit != 0 {
 | 
						if nonMutatingLimit != 0 {
 | 
				
			||||||
		nonMutatingChan = make(chan bool, nonMutatingLimit)
 | 
							nonMutatingChan = make(chan bool, nonMutatingLimit)
 | 
				
			||||||
		watermark.readOnlyObserver.SetDenominator(float64(nonMutatingLimit))
 | 
							klog.V(2).InfoS("Initialized nonMutatingChan", "len", nonMutatingLimit)
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							klog.V(2).InfoS("Running with nil nonMutatingChan")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if mutatingLimit != 0 {
 | 
						if mutatingLimit != 0 {
 | 
				
			||||||
		mutatingChan = make(chan bool, mutatingLimit)
 | 
							mutatingChan = make(chan bool, mutatingLimit)
 | 
				
			||||||
		watermark.mutatingObserver.SetDenominator(float64(mutatingLimit))
 | 
							klog.V(2).InfoS("Initialized mutatingChan", "len", mutatingLimit)
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							klog.V(2).InfoS("Running with nil mutatingChan")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						initMaxInFlight(nonMutatingLimit, mutatingLimit)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
 | 
						return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
 | 
				
			||||||
		ctx := r.Context()
 | 
							ctx := r.Context()
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -32,7 +32,7 @@ import (
 | 
				
			||||||
	fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
 | 
						fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func createMaxInflightServer(callsWg, blockWg *sync.WaitGroup, disableCallsWg *bool, disableCallsWgMutex *sync.Mutex, nonMutating, mutating int) *httptest.Server {
 | 
					func createMaxInflightServer(t *testing.T, callsWg, blockWg *sync.WaitGroup, disableCallsWg *bool, disableCallsWgMutex *sync.Mutex, nonMutating, mutating int) *httptest.Server {
 | 
				
			||||||
	fcmetrics.Register()
 | 
						fcmetrics.Register()
 | 
				
			||||||
	longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy"))
 | 
						longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy"))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -49,7 +49,9 @@ func createMaxInflightServer(callsWg, blockWg *sync.WaitGroup, disableCallsWg *b
 | 
				
			||||||
			if waitForCalls {
 | 
								if waitForCalls {
 | 
				
			||||||
				callsWg.Done()
 | 
									callsWg.Done()
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
								t.Logf("About to blockWg.Wait(), requestURI=%v, remoteAddr=%v", r.RequestURI, r.RemoteAddr)
 | 
				
			||||||
			blockWg.Wait()
 | 
								blockWg.Wait()
 | 
				
			||||||
 | 
								t.Logf("Returned from blockWg.Wait(), requestURI=%v, remoteAddr=%v", r.RequestURI, r.RemoteAddr)
 | 
				
			||||||
		}),
 | 
							}),
 | 
				
			||||||
		nonMutating,
 | 
							nonMutating,
 | 
				
			||||||
		mutating,
 | 
							mutating,
 | 
				
			||||||
| 
						 | 
					@ -103,7 +105,7 @@ func TestMaxInFlightNonMutating(t *testing.T) {
 | 
				
			||||||
	waitForCalls := true
 | 
						waitForCalls := true
 | 
				
			||||||
	waitForCallsMutex := sync.Mutex{}
 | 
						waitForCallsMutex := sync.Mutex{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, AllowedNonMutatingInflightRequestsNo, 1)
 | 
						server := createMaxInflightServer(t, calls, block, &waitForCalls, &waitForCallsMutex, AllowedNonMutatingInflightRequestsNo, 1)
 | 
				
			||||||
	defer server.Close()
 | 
						defer server.Close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	ctx, cancel := context.WithCancel(context.Background())
 | 
						ctx, cancel := context.WithCancel(context.Background())
 | 
				
			||||||
| 
						 | 
					@ -187,7 +189,7 @@ func TestMaxInFlightMutating(t *testing.T) {
 | 
				
			||||||
	waitForCalls := true
 | 
						waitForCalls := true
 | 
				
			||||||
	waitForCallsMutex := sync.Mutex{}
 | 
						waitForCallsMutex := sync.Mutex{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, 1, AllowedMutatingInflightRequestsNo)
 | 
						server := createMaxInflightServer(t, calls, block, &waitForCalls, &waitForCallsMutex, 1, AllowedMutatingInflightRequestsNo)
 | 
				
			||||||
	defer server.Close()
 | 
						defer server.Close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	ctx, cancel := context.WithCancel(context.Background())
 | 
						ctx, cancel := context.WithCancel(context.Background())
 | 
				
			||||||
| 
						 | 
					@ -283,7 +285,7 @@ func TestMaxInFlightSkipsMasters(t *testing.T) {
 | 
				
			||||||
	waitForCalls := true
 | 
						waitForCalls := true
 | 
				
			||||||
	waitForCallsMutex := sync.Mutex{}
 | 
						waitForCallsMutex := sync.Mutex{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, 1, AllowedMutatingInflightRequestsNo)
 | 
						server := createMaxInflightServer(t, calls, block, &waitForCalls, &waitForCallsMutex, 1, AllowedMutatingInflightRequestsNo)
 | 
				
			||||||
	defer server.Close()
 | 
						defer server.Close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	ctx, cancel := context.WithCancel(context.Background())
 | 
						ctx, cancel := context.WithCancel(context.Background())
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -21,6 +21,7 @@ import (
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"net/http"
 | 
						"net/http"
 | 
				
			||||||
	"runtime"
 | 
						"runtime"
 | 
				
			||||||
 | 
						"sync"
 | 
				
			||||||
	"sync/atomic"
 | 
						"sync/atomic"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -47,8 +48,6 @@ type PriorityAndFairnessClassification struct {
 | 
				
			||||||
// waitingMark tracks requests waiting rather than being executed
 | 
					// waitingMark tracks requests waiting rather than being executed
 | 
				
			||||||
var waitingMark = &requestWatermark{
 | 
					var waitingMark = &requestWatermark{
 | 
				
			||||||
	phase: epmetrics.WaitingPhase,
 | 
						phase: epmetrics.WaitingPhase,
 | 
				
			||||||
	readOnlyObserver: fcmetrics.ReadWriteConcurrencyGaugeVec.NewForLabelValuesSafe(0, 1, []string{fcmetrics.LabelValueWaiting, epmetrics.ReadOnlyKind}),
 | 
					 | 
				
			||||||
	mutatingObserver: fcmetrics.ReadWriteConcurrencyGaugeVec.NewForLabelValuesSafe(0, 1, []string{fcmetrics.LabelValueWaiting, epmetrics.MutatingKind}),
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
var atomicMutatingExecuting, atomicReadOnlyExecuting int32
 | 
					var atomicMutatingExecuting, atomicReadOnlyExecuting int32
 | 
				
			||||||
| 
						 | 
					@ -66,6 +65,8 @@ func truncateLogField(s string) string {
 | 
				
			||||||
	return s
 | 
						return s
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var initAPFOnce sync.Once
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// WithPriorityAndFairness limits the number of in-flight
 | 
					// WithPriorityAndFairness limits the number of in-flight
 | 
				
			||||||
// requests in a fine-grained way.
 | 
					// requests in a fine-grained way.
 | 
				
			||||||
func WithPriorityAndFairness(
 | 
					func WithPriorityAndFairness(
 | 
				
			||||||
| 
						 | 
					@ -78,6 +79,13 @@ func WithPriorityAndFairness(
 | 
				
			||||||
		klog.Warningf("priority and fairness support not found, skipping")
 | 
							klog.Warningf("priority and fairness support not found, skipping")
 | 
				
			||||||
		return handler
 | 
							return handler
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						initAPFOnce.Do(func() {
 | 
				
			||||||
 | 
							initMaxInFlight(0, 0)
 | 
				
			||||||
 | 
							// Fetching these gauges is delayed until after their underlying metric has been registered
 | 
				
			||||||
 | 
							// so that this latches onto the efficient implementation.
 | 
				
			||||||
 | 
							waitingMark.readOnlyObserver = fcmetrics.GetWaitingReadonlyConcurrency()
 | 
				
			||||||
 | 
							waitingMark.mutatingObserver = fcmetrics.GetWaitingMutatingConcurrency()
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
 | 
						return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
 | 
				
			||||||
		ctx := r.Context()
 | 
							ctx := r.Context()
 | 
				
			||||||
		requestInfo, ok := apirequest.RequestInfoFrom(ctx)
 | 
							requestInfo, ok := apirequest.RequestInfoFrom(ctx)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -143,8 +143,8 @@ type configController struct {
 | 
				
			||||||
	// This may only be accessed from the one and only worker goroutine.
 | 
						// This may only be accessed from the one and only worker goroutine.
 | 
				
			||||||
	mostRecentUpdates []updateAttempt
 | 
						mostRecentUpdates []updateAttempt
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// This must be locked while accessing flowSchemas or
 | 
						// This must be locked while accessing the later fields.
 | 
				
			||||||
	// priorityLevelStates.  A lock for writing is needed
 | 
						// A lock for writing is needed
 | 
				
			||||||
	// for writing to any of the following:
 | 
						// for writing to any of the following:
 | 
				
			||||||
	// - the flowSchemas field
 | 
						// - the flowSchemas field
 | 
				
			||||||
	// - the slice held in the flowSchemas field
 | 
						// - the slice held in the flowSchemas field
 | 
				
			||||||
| 
						 | 
					@ -387,6 +387,8 @@ type cfgMeal struct {
 | 
				
			||||||
	// provoking a call into this controller while the lock held
 | 
						// provoking a call into this controller while the lock held
 | 
				
			||||||
	// waiting on that request to complete.
 | 
						// waiting on that request to complete.
 | 
				
			||||||
	fsStatusUpdates []fsStatusUpdate
 | 
						fsStatusUpdates []fsStatusUpdate
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						maxWaitingRequests, maxExecutingRequests int
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// A buffered set of status updates for FlowSchemas
 | 
					// A buffered set of status updates for FlowSchemas
 | 
				
			||||||
| 
						 | 
					@ -511,7 +513,13 @@ func (cfgCtlr *configController) lockAndDigestConfigObjects(newPLs []*flowcontro
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// The new config has been constructed
 | 
						// The new config has been constructed
 | 
				
			||||||
	cfgCtlr.priorityLevelStates = meal.newPLStates
 | 
						cfgCtlr.priorityLevelStates = meal.newPLStates
 | 
				
			||||||
	klog.V(5).Infof("Switched to new API Priority and Fairness configuration")
 | 
						klog.V(5).InfoS("Switched to new API Priority and Fairness configuration", "maxWaitingRequests", meal.maxWaitingRequests, "maxExecutinRequests", meal.maxExecutingRequests)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						metrics.GetWaitingReadonlyConcurrency().SetDenominator(float64(meal.maxWaitingRequests))
 | 
				
			||||||
 | 
						metrics.GetWaitingMutatingConcurrency().SetDenominator(float64(meal.maxWaitingRequests))
 | 
				
			||||||
 | 
						metrics.GetExecutingReadonlyConcurrency().SetDenominator(float64(meal.maxExecutingRequests))
 | 
				
			||||||
 | 
						metrics.GetExecutingMutatingConcurrency().SetDenominator(float64(meal.maxExecutingRequests))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return meal.fsStatusUpdates
 | 
						return meal.fsStatusUpdates
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -663,6 +671,12 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
 | 
				
			||||||
		// difference will be negligible.
 | 
							// difference will be negligible.
 | 
				
			||||||
		concurrencyLimit := int(math.Ceil(float64(meal.cfgCtlr.serverConcurrencyLimit) * float64(plState.pl.Spec.Limited.AssuredConcurrencyShares) / meal.shareSum))
 | 
							concurrencyLimit := int(math.Ceil(float64(meal.cfgCtlr.serverConcurrencyLimit) * float64(plState.pl.Spec.Limited.AssuredConcurrencyShares) / meal.shareSum))
 | 
				
			||||||
		metrics.UpdateSharedConcurrencyLimit(plName, concurrencyLimit)
 | 
							metrics.UpdateSharedConcurrencyLimit(plName, concurrencyLimit)
 | 
				
			||||||
 | 
							meal.maxExecutingRequests += concurrencyLimit
 | 
				
			||||||
 | 
							var waitLimit int
 | 
				
			||||||
 | 
							if qCfg := plState.pl.Spec.Limited.LimitResponse.Queuing; qCfg != nil {
 | 
				
			||||||
 | 
								waitLimit = int(qCfg.Queues * qCfg.QueueLengthLimit)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							meal.maxWaitingRequests += waitLimit
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if plState.queues == nil {
 | 
							if plState.queues == nil {
 | 
				
			||||||
			klog.V(5).Infof("Introducing queues for priority level %q: config=%s, concurrencyLimit=%d, quiescing=%v (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, plState.quiescing, plState.pl.Spec.Limited.AssuredConcurrencyShares, meal.shareSum)
 | 
								klog.V(5).Infof("Introducing queues for priority level %q: config=%s, concurrencyLimit=%d, quiescing=%v (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, plState.quiescing, plState.pl.Spec.Limited.AssuredConcurrencyShares, meal.shareSum)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -243,6 +243,9 @@ func (qs *queueSet) setConfiguration(ctx context.Context, qCfg fq.QueuingConfig,
 | 
				
			||||||
	if qll < 1 {
 | 
						if qll < 1 {
 | 
				
			||||||
		qll = 1
 | 
							qll = 1
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						if qCfg.DesiredNumQueues > 0 {
 | 
				
			||||||
 | 
							qll *= qCfg.DesiredNumQueues
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	qs.reqsGaugePair.RequestsWaiting.SetDenominator(float64(qll))
 | 
						qs.reqsGaugePair.RequestsWaiting.SetDenominator(float64(qll))
 | 
				
			||||||
	qs.reqsGaugePair.RequestsExecuting.SetDenominator(float64(dCfg.ConcurrencyLimit))
 | 
						qs.reqsGaugePair.RequestsExecuting.SetDenominator(float64(dCfg.ConcurrencyLimit))
 | 
				
			||||||
	qs.execSeatsGauge.SetDenominator(float64(dCfg.ConcurrencyLimit))
 | 
						qs.execSeatsGauge.SetDenominator(float64(dCfg.ConcurrencyLimit))
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -23,6 +23,7 @@ import (
 | 
				
			||||||
	"sync"
 | 
						"sync"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						epmetrics "k8s.io/apiserver/pkg/endpoints/metrics"
 | 
				
			||||||
	apirequest "k8s.io/apiserver/pkg/endpoints/request"
 | 
						apirequest "k8s.io/apiserver/pkg/endpoints/request"
 | 
				
			||||||
	compbasemetrics "k8s.io/component-base/metrics"
 | 
						compbasemetrics "k8s.io/component-base/metrics"
 | 
				
			||||||
	"k8s.io/component-base/metrics/legacyregistry"
 | 
						"k8s.io/component-base/metrics/legacyregistry"
 | 
				
			||||||
| 
						 | 
					@ -128,21 +129,22 @@ var (
 | 
				
			||||||
			Name:      "priority_level_request_utilization",
 | 
								Name:      "priority_level_request_utilization",
 | 
				
			||||||
			Help:      "Observations, at the end of every nanosecond, of number of requests (as a fraction of the relevant limit) waiting or in any stage of execution (but only initial stage for WATCHes)",
 | 
								Help:      "Observations, at the end of every nanosecond, of number of requests (as a fraction of the relevant limit) waiting or in any stage of execution (but only initial stage for WATCHes)",
 | 
				
			||||||
			// For executing: the denominator will be seats, so this metric will skew low.
 | 
								// For executing: the denominator will be seats, so this metric will skew low.
 | 
				
			||||||
			// FOr waiting: the denominiator is individual queue length limit, so this metric can go over 1.  Issue #110160
 | 
								// For waiting: total queue capacity is generally quite generous, so this metric will skew low.
 | 
				
			||||||
			Buckets:        []float64{0, 0.001, 0.0025, 0.005, 0.1, 0.25, 0.5, 0.75, 1, 10, 100},
 | 
								Buckets:        []float64{0, 0.001, 0.003, 0.01, 0.03, 0.1, 0.25, 0.5, 0.75, 1},
 | 
				
			||||||
			StabilityLevel: compbasemetrics.ALPHA,
 | 
								StabilityLevel: compbasemetrics.ALPHA,
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
		LabelNamePhase, priorityLevel,
 | 
							LabelNamePhase, priorityLevel,
 | 
				
			||||||
	)
 | 
						)
 | 
				
			||||||
	// ReadWriteConcurrencyPairVec creates gauges of number of requests broken down by phase and mutating vs readonly
 | 
						// readWriteConcurrencyGaugeVec creates ratioed gauges of requests/limit broken down by phase and mutating vs readonly
 | 
				
			||||||
	ReadWriteConcurrencyGaugeVec = NewTimingRatioHistogramVec(
 | 
						readWriteConcurrencyGaugeVec = NewTimingRatioHistogramVec(
 | 
				
			||||||
		&compbasemetrics.TimingHistogramOpts{
 | 
							&compbasemetrics.TimingHistogramOpts{
 | 
				
			||||||
			Namespace: namespace,
 | 
								Namespace: namespace,
 | 
				
			||||||
			Subsystem: subsystem,
 | 
								Subsystem: subsystem,
 | 
				
			||||||
			Name:      "read_vs_write_current_requests",
 | 
								Name:      "read_vs_write_current_requests",
 | 
				
			||||||
			Help:      "Observations, at the end of every nanosecond, of the number of requests (as a fraction of the relevant limit, if max-in-flight filter is being used) waiting or in regular stage of execution",
 | 
								Help:      "Observations, at the end of every nanosecond, of the number of requests (as a fraction of the relevant limit) waiting or in regular stage of execution",
 | 
				
			||||||
			Buckets:   []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99, 1, 3, 10, 30, 100, 300, 1000, 3000},
 | 
								// This metric will skew low for the same reason as the priority level metrics
 | 
				
			||||||
			// TODO: something about the utilization vs count irregularity.  Issue #109846
 | 
								// and also because APF has a combined limit for mutating and readonly.
 | 
				
			||||||
 | 
								Buckets:        []float64{0, 0.001, 0.01, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99, 1},
 | 
				
			||||||
			StabilityLevel: compbasemetrics.ALPHA,
 | 
								StabilityLevel: compbasemetrics.ALPHA,
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
		LabelNamePhase, requestKind,
 | 
							LabelNamePhase, requestKind,
 | 
				
			||||||
| 
						 | 
					@ -337,9 +339,39 @@ var (
 | 
				
			||||||
	}.
 | 
						}.
 | 
				
			||||||
		Append(PriorityLevelExecutionSeatsGaugeVec.metrics()...).
 | 
							Append(PriorityLevelExecutionSeatsGaugeVec.metrics()...).
 | 
				
			||||||
		Append(PriorityLevelConcurrencyGaugeVec.metrics()...).
 | 
							Append(PriorityLevelConcurrencyGaugeVec.metrics()...).
 | 
				
			||||||
		Append(ReadWriteConcurrencyGaugeVec.metrics()...)
 | 
							Append(readWriteConcurrencyGaugeVec.metrics()...)
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type indexOnce struct {
 | 
				
			||||||
 | 
						labelValues []string
 | 
				
			||||||
 | 
						once        sync.Once
 | 
				
			||||||
 | 
						gauge       RatioedGauge
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (io *indexOnce) getGauge() RatioedGauge {
 | 
				
			||||||
 | 
						io.once.Do(func() {
 | 
				
			||||||
 | 
							io.gauge = readWriteConcurrencyGaugeVec.NewForLabelValuesSafe(0, 1, io.labelValues)
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						return io.gauge
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var waitingReadonly = indexOnce{labelValues: []string{LabelValueWaiting, epmetrics.ReadOnlyKind}}
 | 
				
			||||||
 | 
					var executingReadonly = indexOnce{labelValues: []string{LabelValueExecuting, epmetrics.ReadOnlyKind}}
 | 
				
			||||||
 | 
					var waitingMutating = indexOnce{labelValues: []string{LabelValueWaiting, epmetrics.MutatingKind}}
 | 
				
			||||||
 | 
					var executingMutating = indexOnce{labelValues: []string{LabelValueExecuting, epmetrics.MutatingKind}}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// GetWaitingReadonlyConcurrency returns the gauge of number of readonly requests waiting / limit on those.
 | 
				
			||||||
 | 
					var GetWaitingReadonlyConcurrency = waitingReadonly.getGauge
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// GetExecutingReadonlyConcurrency returns the gauge of number of executing readonly requests / limit on those.
 | 
				
			||||||
 | 
					var GetExecutingReadonlyConcurrency = executingReadonly.getGauge
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// GetWaitingMutatingConcurrency returns the gauge of number of mutating requests waiting / limit on those.
 | 
				
			||||||
 | 
					var GetWaitingMutatingConcurrency = waitingMutating.getGauge
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// GetExecutingMutatingConcurrency returns the gauge of number of executing mutating requests / limit on those.
 | 
				
			||||||
 | 
					var GetExecutingMutatingConcurrency = executingMutating.getGauge
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// AddRequestsInQueues adds the given delta to the gauge of the # of requests in the queues of the specified flowSchema and priorityLevel
 | 
					// AddRequestsInQueues adds the given delta to the gauge of the # of requests in the queues of the specified flowSchema and priorityLevel
 | 
				
			||||||
func AddRequestsInQueues(ctx context.Context, priorityLevel, flowSchema string, delta int) {
 | 
					func AddRequestsInQueues(ctx context.Context, priorityLevel, flowSchema string, delta int) {
 | 
				
			||||||
	apiserverCurrentInqueueRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta))
 | 
						apiserverCurrentInqueueRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta))
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -192,12 +192,14 @@ func (v *TimingRatioHistogramVec) NewForLabelValuesChecked(initialNumerator, ini
 | 
				
			||||||
func (v *TimingRatioHistogramVec) NewForLabelValuesSafe(initialNumerator, initialDenominator float64, labelValues []string) RatioedGauge {
 | 
					func (v *TimingRatioHistogramVec) NewForLabelValuesSafe(initialNumerator, initialDenominator float64, labelValues []string) RatioedGauge {
 | 
				
			||||||
	tro, err := v.NewForLabelValuesChecked(initialNumerator, initialDenominator, labelValues)
 | 
						tro, err := v.NewForLabelValuesChecked(initialNumerator, initialDenominator, labelValues)
 | 
				
			||||||
	if err == nil {
 | 
						if err == nil {
 | 
				
			||||||
 | 
							klog.V(3).InfoS("TimingRatioHistogramVec.NewForLabelValuesSafe hit the efficient case", "fqName", v.FQName(), "labelValues", labelValues)
 | 
				
			||||||
		return tro
 | 
							return tro
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if !compbasemetrics.ErrIsNotRegistered(err) {
 | 
						if !compbasemetrics.ErrIsNotRegistered(err) {
 | 
				
			||||||
		klog.ErrorS(err, "Failed to extract TimingRatioHistogramVec member, using noop instead", "vectorname", v.FQName(), "labelValues", labelValues)
 | 
							klog.ErrorS(err, "Failed to extract TimingRatioHistogramVec member, using noop instead", "vectorname", v.FQName(), "labelValues", labelValues)
 | 
				
			||||||
		return tro
 | 
							return tro
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						klog.V(3).InfoS("TimingRatioHistogramVec.NewForLabelValuesSafe hit the inefficient case", "fqName", v.FQName(), "labelValues", labelValues)
 | 
				
			||||||
	// At this point we know v.NewForLabelValuesChecked(..) returns a permanent noop,
 | 
						// At this point we know v.NewForLabelValuesChecked(..) returns a permanent noop,
 | 
				
			||||||
	// which we precisely want to avoid using.  Instead, make our own gauge that
 | 
						// which we precisely want to avoid using.  Instead, make our own gauge that
 | 
				
			||||||
	// fetches the element on every Set.
 | 
						// fetches the element on every Set.
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue