Add a metric to track usage of inflight request limit.
Kubernetes-commit: 000d7bac29b9239a29531a526d382394d8d60353
This commit is contained in:
		
							parent
							
								
									4b887934ec
								
							
						
					
					
						commit
						f2c38580dc
					
				| 
						 | 
				
			
			@ -86,6 +86,15 @@ var (
 | 
			
		|||
		},
 | 
			
		||||
		[]string{"requestKind"},
 | 
			
		||||
	)
 | 
			
		||||
	// Becasue of volatality of the base metric this is pre-aggregated one. Instead of reporing current usage all the time
 | 
			
		||||
	// it reports maximal usage during the last second.
 | 
			
		||||
	currentInflightRequests = prometheus.NewGaugeVec(
 | 
			
		||||
		prometheus.GaugeOpts{
 | 
			
		||||
			Name: "apiserver_current_inflight_requests",
 | 
			
		||||
			Help: "Maximal mumber of currently used inflight request limit of this apiserver per request kind in last second.",
 | 
			
		||||
		},
 | 
			
		||||
		[]string{"requestKind"},
 | 
			
		||||
	)
 | 
			
		||||
	kubectlExeRegexp = regexp.MustCompile(`^.*((?i:kubectl\.exe))`)
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -104,6 +113,12 @@ func init() {
 | 
			
		|||
	prometheus.MustRegister(requestLatenciesSummary)
 | 
			
		||||
	prometheus.MustRegister(responseSizes)
 | 
			
		||||
	prometheus.MustRegister(DroppedRequests)
 | 
			
		||||
	prometheus.MustRegister(currentInflightRequests)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func UpdateInflightRequestMetrics(nonmutating, mutating int) {
 | 
			
		||||
	currentInflightRequests.WithLabelValues(ReadOnlyKind).Set(float64(nonmutating))
 | 
			
		||||
	currentInflightRequests.WithLabelValues(MutatingKind).Set(float64(mutating))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Record records a single request to the standard metrics endpoints. For use by handlers that perform their own
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -47,6 +47,7 @@ go_library(
 | 
			
		|||
        "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/util/waitgroup:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -19,8 +19,11 @@ package filters
 | 
			
		|||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/sets"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	"k8s.io/apiserver/pkg/authentication/user"
 | 
			
		||||
	"k8s.io/apiserver/pkg/endpoints/metrics"
 | 
			
		||||
	apirequest "k8s.io/apiserver/pkg/endpoints/request"
 | 
			
		||||
| 
						 | 
				
			
			@ -28,9 +31,16 @@ import (
 | 
			
		|||
	"github.com/golang/glog"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	// Constant for the retry-after interval on rate limiting.
 | 
			
		||||
	// TODO: maybe make this dynamic? or user-adjustable?
 | 
			
		||||
const retryAfter = "1"
 | 
			
		||||
	retryAfter = "1"
 | 
			
		||||
 | 
			
		||||
	// How often inflight usage metric should be updated. Because
 | 
			
		||||
	// the metrics tracks maximal value over period making this
 | 
			
		||||
	// longer will increase the metric value.
 | 
			
		||||
	inflightUsageMetricUpdatePeriod = time.Second
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var nonMutatingRequestVerbs = sets.NewString("get", "list", "watch")
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -40,6 +50,49 @@ func handleError(w http.ResponseWriter, r *http.Request, err error) {
 | 
			
		|||
	glog.Errorf(err.Error())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// requestWatermark is used to trak maximal usage of inflight requests.
 | 
			
		||||
type requestWatermark struct {
 | 
			
		||||
	lock                                 sync.Mutex
 | 
			
		||||
	readOnlyWatermark, mutatingWatermark int
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *requestWatermark) recordMutating(mutatingVal int) {
 | 
			
		||||
	w.lock.Lock()
 | 
			
		||||
	defer w.lock.Unlock()
 | 
			
		||||
 | 
			
		||||
	if w.mutatingWatermark < mutatingVal {
 | 
			
		||||
		w.mutatingWatermark = mutatingVal
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *requestWatermark) recordReadOnly(readOnlyVal int) {
 | 
			
		||||
	w.lock.Lock()
 | 
			
		||||
	defer w.lock.Unlock()
 | 
			
		||||
 | 
			
		||||
	if w.readOnlyWatermark < readOnlyVal {
 | 
			
		||||
		w.readOnlyWatermark = readOnlyVal
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var watermark = &requestWatermark{}
 | 
			
		||||
 | 
			
		||||
func startRecordingUsage() {
 | 
			
		||||
	go func() {
 | 
			
		||||
		wait.Forever(func() {
 | 
			
		||||
			watermark.lock.Lock()
 | 
			
		||||
			readOnlyWatermark := watermark.readOnlyWatermark
 | 
			
		||||
			mutatingWatermark := watermark.mutatingWatermark
 | 
			
		||||
			watermark.readOnlyWatermark = 0
 | 
			
		||||
			watermark.mutatingWatermark = 0
 | 
			
		||||
			watermark.lock.Unlock()
 | 
			
		||||
 | 
			
		||||
			metrics.UpdateInflightRequestMetrics(readOnlyWatermark, mutatingWatermark)
 | 
			
		||||
		}, inflightUsageMetricUpdatePeriod)
 | 
			
		||||
	}()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var startOnce sync.Once
 | 
			
		||||
 | 
			
		||||
// WithMaxInFlightLimit limits the number of in-flight requests to buffer size of the passed in channel.
 | 
			
		||||
func WithMaxInFlightLimit(
 | 
			
		||||
	handler http.Handler,
 | 
			
		||||
| 
						 | 
				
			
			@ -48,6 +101,7 @@ func WithMaxInFlightLimit(
 | 
			
		|||
	requestContextMapper apirequest.RequestContextMapper,
 | 
			
		||||
	longRunningRequestCheck apirequest.LongRunningRequestCheck,
 | 
			
		||||
) http.Handler {
 | 
			
		||||
	startOnce.Do(startRecordingUsage)
 | 
			
		||||
	if nonMutatingLimit == 0 && mutatingLimit == 0 {
 | 
			
		||||
		return handler
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -92,7 +146,22 @@ func WithMaxInFlightLimit(
 | 
			
		|||
 | 
			
		||||
			select {
 | 
			
		||||
			case c <- true:
 | 
			
		||||
				defer func() { <-c }()
 | 
			
		||||
				var mutatingLen, readOnlyLen int
 | 
			
		||||
				if isMutatingRequest {
 | 
			
		||||
					mutatingLen = len(mutatingChan)
 | 
			
		||||
				} else {
 | 
			
		||||
					readOnlyLen = len(nonMutatingChan)
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				defer func() {
 | 
			
		||||
					<-c
 | 
			
		||||
					if isMutatingRequest {
 | 
			
		||||
						watermark.recordMutating(mutatingLen)
 | 
			
		||||
					} else {
 | 
			
		||||
						watermark.recordReadOnly(readOnlyLen)
 | 
			
		||||
					}
 | 
			
		||||
 | 
			
		||||
				}()
 | 
			
		||||
				handler.ServeHTTP(w, r)
 | 
			
		||||
 | 
			
		||||
			default:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue