Estimate width of the request based on watchers count in P&F
Kubernetes-commit: 223f9be59778b6ec2e44fd57df523f00e246bd95
This commit is contained in:
		
							parent
							
								
									be4fe9cf7c
								
							
						
					
					
						commit
						c18ab3e1b1
					
				| 
						 | 
				
			
			@ -758,7 +758,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
 | 
			
		|||
	handler = filterlatency.TrackStarted(handler, "authorization")
 | 
			
		||||
 | 
			
		||||
	if c.FlowControl != nil {
 | 
			
		||||
		requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(c.StorageObjectCountTracker.Get)
 | 
			
		||||
		requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount)
 | 
			
		||||
		handler = filterlatency.TrackCompleted(handler)
 | 
			
		||||
		handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator)
 | 
			
		||||
		handler = filterlatency.TrackStarted(handler, "priorityandfairness")
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,92 @@
 | 
			
		|||
/*
 | 
			
		||||
Copyright 2021 The Kubernetes Authors.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package request
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"math"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	apirequest "k8s.io/apiserver/pkg/endpoints/request"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func newMutatingWorkEstimator(countFn watchCountGetterFunc) WorkEstimatorFunc {
 | 
			
		||||
	estimator := &mutatingWorkEstimator{
 | 
			
		||||
		countFn: countFn,
 | 
			
		||||
	}
 | 
			
		||||
	return estimator.estimate
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type mutatingWorkEstimator struct {
 | 
			
		||||
	countFn watchCountGetterFunc
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	watchesPerSeat          = 10.0
 | 
			
		||||
	eventAdditionalDuration = 5 * time.Millisecond
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func (e *mutatingWorkEstimator) estimate(r *http.Request) WorkEstimate {
 | 
			
		||||
	requestInfo, ok := apirequest.RequestInfoFrom(r.Context())
 | 
			
		||||
	if !ok {
 | 
			
		||||
		// no RequestInfo should never happen, but to be on the safe side
 | 
			
		||||
		// let's return a large value.
 | 
			
		||||
		return WorkEstimate{
 | 
			
		||||
			InitialSeats:      1,
 | 
			
		||||
			FinalSeats:        maximumSeats,
 | 
			
		||||
			AdditionalLatency: eventAdditionalDuration,
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	watchCount := e.countFn(requestInfo)
 | 
			
		||||
 | 
			
		||||
	// The cost of the request associated with the watchers of that event
 | 
			
		||||
	// consists of three parts:
 | 
			
		||||
	// - cost of going through the event change logic
 | 
			
		||||
	// - cost of serialization of the event
 | 
			
		||||
	// - cost of processing an event object for each watcher (e.g. filtering,
 | 
			
		||||
	//     sending data over network)
 | 
			
		||||
	// We're starting simple to get some operational experience with it and
 | 
			
		||||
	// we will work on tuning the algorithm later. As a starting point we
 | 
			
		||||
	// we simply assume that processing 1 event takes 1/Nth of a seat for
 | 
			
		||||
	// M milliseconds and processing different events is infinitely parallelizable.
 | 
			
		||||
	// We simply record the appropriate values here and rely on potential
 | 
			
		||||
	// reshaping of the request if the concurrency limit for a given priority
 | 
			
		||||
	// level will not allow to run request with that many seats.
 | 
			
		||||
	//
 | 
			
		||||
	// TODO: As described in the KEP, we should take into account that not all
 | 
			
		||||
	//   events are equal and try to estimate the cost of a single event based on
 | 
			
		||||
	//   some historical data about size of events.
 | 
			
		||||
	var finalSeats uint
 | 
			
		||||
	var additionalLatency time.Duration
 | 
			
		||||
 | 
			
		||||
	// TODO: Make this unconditional after we tune the algorithm better.
 | 
			
		||||
	//   Technically, there is an overhead connected to processing an event after
 | 
			
		||||
	//   the request finishes even if there is a small number of watches.
 | 
			
		||||
	//   However, until we tune the estimation we want to stay on the safe side
 | 
			
		||||
	//   an avoid introducing additional latency for almost every single request.
 | 
			
		||||
	if watchCount >= watchesPerSeat {
 | 
			
		||||
		finalSeats = uint(math.Ceil(float64(watchCount) / watchesPerSeat))
 | 
			
		||||
		additionalLatency = eventAdditionalDuration
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return WorkEstimate{
 | 
			
		||||
		InitialSeats:      1,
 | 
			
		||||
		FinalSeats:        finalSeats,
 | 
			
		||||
		AdditionalLatency: additionalLatency,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -65,12 +65,17 @@ func (we *WorkEstimate) MaxSeats() int {
 | 
			
		|||
// number of objects for a given resource.
 | 
			
		||||
type objectCountGetterFunc func(string) (int64, error)
 | 
			
		||||
 | 
			
		||||
// watchCountGetterFunc represents a function that gets the total
 | 
			
		||||
// number of watchers potentially interested in a given request.
 | 
			
		||||
type watchCountGetterFunc func(*apirequest.RequestInfo) int
 | 
			
		||||
 | 
			
		||||
// NewWorkEstimator estimates the work that will be done by a given request,
 | 
			
		||||
// if no WorkEstimatorFunc matches the given request then the default
 | 
			
		||||
// work estimate of 1 seat is allocated to the request.
 | 
			
		||||
func NewWorkEstimator(countFn objectCountGetterFunc) WorkEstimatorFunc {
 | 
			
		||||
func NewWorkEstimator(objectCountFn objectCountGetterFunc, watchCountFn watchCountGetterFunc) WorkEstimatorFunc {
 | 
			
		||||
	estimator := &workEstimator{
 | 
			
		||||
		listWorkEstimator: newListWorkEstimator(countFn),
 | 
			
		||||
		listWorkEstimator:     newListWorkEstimator(objectCountFn),
 | 
			
		||||
		mutatingWorkEstimator: newMutatingWorkEstimator(watchCountFn),
 | 
			
		||||
	}
 | 
			
		||||
	return estimator.estimate
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -87,6 +92,8 @@ func (e WorkEstimatorFunc) EstimateWork(r *http.Request) WorkEstimate {
 | 
			
		|||
type workEstimator struct {
 | 
			
		||||
	// listWorkEstimator estimates work for list request(s)
 | 
			
		||||
	listWorkEstimator WorkEstimatorFunc
 | 
			
		||||
	// mutatingWorkEstimator calculates the width of mutating request(s)
 | 
			
		||||
	mutatingWorkEstimator WorkEstimatorFunc
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (e *workEstimator) estimate(r *http.Request) WorkEstimate {
 | 
			
		||||
| 
						 | 
				
			
			@ -100,6 +107,8 @@ func (e *workEstimator) estimate(r *http.Request) WorkEstimate {
 | 
			
		|||
	switch requestInfo.Verb {
 | 
			
		||||
	case "list":
 | 
			
		||||
		return e.listWorkEstimator.EstimateWork(r)
 | 
			
		||||
	case "create", "update", "patch", "delete":
 | 
			
		||||
		return e.mutatingWorkEstimator.EstimateWork(r)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return WorkEstimate{InitialSeats: minimumSeats}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -20,18 +20,22 @@ import (
 | 
			
		|||
	"errors"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	apirequest "k8s.io/apiserver/pkg/endpoints/request"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestWorkEstimator(t *testing.T) {
 | 
			
		||||
	tests := []struct {
 | 
			
		||||
		name                 string
 | 
			
		||||
		requestURI           string
 | 
			
		||||
		requestInfo          *apirequest.RequestInfo
 | 
			
		||||
		counts               map[string]int64
 | 
			
		||||
		countErr             error
 | 
			
		||||
		initialSeatsExpected uint
 | 
			
		||||
		name                      string
 | 
			
		||||
		requestURI                string
 | 
			
		||||
		requestInfo               *apirequest.RequestInfo
 | 
			
		||||
		counts                    map[string]int64
 | 
			
		||||
		countErr                  error
 | 
			
		||||
		watchCount                int
 | 
			
		||||
		initialSeatsExpected      uint
 | 
			
		||||
		finalSeatsExpected        uint
 | 
			
		||||
		additionalLatencyExpected time.Duration
 | 
			
		||||
	}{
 | 
			
		||||
		{
 | 
			
		||||
			name:                 "request has no RequestInfo",
 | 
			
		||||
| 
						 | 
				
			
			@ -248,6 +252,132 @@ func TestWorkEstimator(t *testing.T) {
 | 
			
		|||
			countErr:             errors.New("unknown error"),
 | 
			
		||||
			initialSeatsExpected: maximumSeats,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:       "request verb is create, no watches",
 | 
			
		||||
			requestURI: "http://server/apis/foo.bar/v1/foos",
 | 
			
		||||
			requestInfo: &apirequest.RequestInfo{
 | 
			
		||||
				Verb:     "create",
 | 
			
		||||
				APIGroup: "foo.bar",
 | 
			
		||||
				Resource: "foos",
 | 
			
		||||
			},
 | 
			
		||||
			initialSeatsExpected:      1,
 | 
			
		||||
			finalSeatsExpected:        0,
 | 
			
		||||
			additionalLatencyExpected: 0,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:       "request verb is create, watches registered",
 | 
			
		||||
			requestURI: "http://server/apis/foo.bar/v1/foos",
 | 
			
		||||
			requestInfo: &apirequest.RequestInfo{
 | 
			
		||||
				Verb:     "create",
 | 
			
		||||
				APIGroup: "foo.bar",
 | 
			
		||||
				Resource: "foos",
 | 
			
		||||
			},
 | 
			
		||||
			watchCount:                29,
 | 
			
		||||
			initialSeatsExpected:      1,
 | 
			
		||||
			finalSeatsExpected:        3,
 | 
			
		||||
			additionalLatencyExpected: 5 * time.Millisecond,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:       "request verb is create, watches registered, no additional latency",
 | 
			
		||||
			requestURI: "http://server/apis/foo.bar/v1/foos",
 | 
			
		||||
			requestInfo: &apirequest.RequestInfo{
 | 
			
		||||
				Verb:     "create",
 | 
			
		||||
				APIGroup: "foo.bar",
 | 
			
		||||
				Resource: "foos",
 | 
			
		||||
			},
 | 
			
		||||
			watchCount:                5,
 | 
			
		||||
			initialSeatsExpected:      1,
 | 
			
		||||
			finalSeatsExpected:        0,
 | 
			
		||||
			additionalLatencyExpected: 0,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:       "request verb is create, watches registered, maximum is exceeded",
 | 
			
		||||
			requestURI: "http://server/apis/foo.bar/v1/foos",
 | 
			
		||||
			requestInfo: &apirequest.RequestInfo{
 | 
			
		||||
				Verb:     "create",
 | 
			
		||||
				APIGroup: "foo.bar",
 | 
			
		||||
				Resource: "foos",
 | 
			
		||||
			},
 | 
			
		||||
			watchCount:                199,
 | 
			
		||||
			initialSeatsExpected:      1,
 | 
			
		||||
			finalSeatsExpected:        20,
 | 
			
		||||
			additionalLatencyExpected: 5 * time.Millisecond,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:       "request verb is update, no watches",
 | 
			
		||||
			requestURI: "http://server/apis/foo.bar/v1/foos/myfoo",
 | 
			
		||||
			requestInfo: &apirequest.RequestInfo{
 | 
			
		||||
				Verb:     "update",
 | 
			
		||||
				APIGroup: "foo.bar",
 | 
			
		||||
				Resource: "foos",
 | 
			
		||||
			},
 | 
			
		||||
			initialSeatsExpected:      1,
 | 
			
		||||
			finalSeatsExpected:        0,
 | 
			
		||||
			additionalLatencyExpected: 0,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:       "request verb is update, watches registered",
 | 
			
		||||
			requestURI: "http://server/apis/foor.bar/v1/foos/myfoo",
 | 
			
		||||
			requestInfo: &apirequest.RequestInfo{
 | 
			
		||||
				Verb:     "update",
 | 
			
		||||
				APIGroup: "foo.bar",
 | 
			
		||||
				Resource: "foos",
 | 
			
		||||
			},
 | 
			
		||||
			watchCount:                29,
 | 
			
		||||
			initialSeatsExpected:      1,
 | 
			
		||||
			finalSeatsExpected:        3,
 | 
			
		||||
			additionalLatencyExpected: 5 * time.Millisecond,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:       "request verb is patch, no watches",
 | 
			
		||||
			requestURI: "http://server/apis/foo.bar/v1/foos/myfoo",
 | 
			
		||||
			requestInfo: &apirequest.RequestInfo{
 | 
			
		||||
				Verb:     "patch",
 | 
			
		||||
				APIGroup: "foo.bar",
 | 
			
		||||
				Resource: "foos",
 | 
			
		||||
			},
 | 
			
		||||
			initialSeatsExpected:      1,
 | 
			
		||||
			finalSeatsExpected:        0,
 | 
			
		||||
			additionalLatencyExpected: 0,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:       "request verb is patch, watches registered",
 | 
			
		||||
			requestURI: "http://server/apis/foo.bar/v1/foos/myfoo",
 | 
			
		||||
			requestInfo: &apirequest.RequestInfo{
 | 
			
		||||
				Verb:     "patch",
 | 
			
		||||
				APIGroup: "foo.bar",
 | 
			
		||||
				Resource: "foos",
 | 
			
		||||
			},
 | 
			
		||||
			watchCount:                29,
 | 
			
		||||
			initialSeatsExpected:      1,
 | 
			
		||||
			finalSeatsExpected:        3,
 | 
			
		||||
			additionalLatencyExpected: 5 * time.Millisecond,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:       "request verb is delete, no watches",
 | 
			
		||||
			requestURI: "http://server/apis/foo.bar/v1/foos/myfoo",
 | 
			
		||||
			requestInfo: &apirequest.RequestInfo{
 | 
			
		||||
				Verb:     "delete",
 | 
			
		||||
				APIGroup: "foo.bar",
 | 
			
		||||
				Resource: "foos",
 | 
			
		||||
			},
 | 
			
		||||
			initialSeatsExpected:      1,
 | 
			
		||||
			finalSeatsExpected:        0,
 | 
			
		||||
			additionalLatencyExpected: 0,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:       "request verb is delete, watches registered",
 | 
			
		||||
			requestURI: "http://server/apis/foo.bar/v1/foos/myfoo",
 | 
			
		||||
			requestInfo: &apirequest.RequestInfo{
 | 
			
		||||
				Verb:     "delete",
 | 
			
		||||
				APIGroup: "foo.bar",
 | 
			
		||||
				Resource: "foos",
 | 
			
		||||
			},
 | 
			
		||||
			watchCount:                29,
 | 
			
		||||
			initialSeatsExpected:      1,
 | 
			
		||||
			finalSeatsExpected:        3,
 | 
			
		||||
			additionalLatencyExpected: 5 * time.Millisecond,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, test := range tests {
 | 
			
		||||
| 
						 | 
				
			
			@ -259,7 +389,10 @@ func TestWorkEstimator(t *testing.T) {
 | 
			
		|||
			countsFn := func(key string) (int64, error) {
 | 
			
		||||
				return counts[key], test.countErr
 | 
			
		||||
			}
 | 
			
		||||
			estimator := NewWorkEstimator(countsFn)
 | 
			
		||||
			watchCountsFn := func(_ *apirequest.RequestInfo) int {
 | 
			
		||||
				return test.watchCount
 | 
			
		||||
			}
 | 
			
		||||
			estimator := NewWorkEstimator(countsFn, watchCountsFn)
 | 
			
		||||
 | 
			
		||||
			req, err := http.NewRequest("GET", test.requestURI, nil)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
| 
						 | 
				
			
			@ -272,7 +405,13 @@ func TestWorkEstimator(t *testing.T) {
 | 
			
		|||
 | 
			
		||||
			workestimateGot := estimator.EstimateWork(req)
 | 
			
		||||
			if test.initialSeatsExpected != workestimateGot.InitialSeats {
 | 
			
		||||
				t.Errorf("Expected work estimate to match: %d seats, but got: %d seats", test.initialSeatsExpected, workestimateGot.InitialSeats)
 | 
			
		||||
				t.Errorf("Expected work estimate to match: %d initial seats, but got: %d", test.initialSeatsExpected, workestimateGot.InitialSeats)
 | 
			
		||||
			}
 | 
			
		||||
			if test.finalSeatsExpected != workestimateGot.FinalSeats {
 | 
			
		||||
				t.Errorf("Expected work estimate to match: %d final seats, but got: %d", test.finalSeatsExpected, workestimateGot.FinalSeats)
 | 
			
		||||
			}
 | 
			
		||||
			if test.additionalLatencyExpected != workestimateGot.AdditionalLatency {
 | 
			
		||||
				t.Errorf("Expected work estimate to match additional latency: %v, but got: %v", test.additionalLatencyExpected, workestimateGot.AdditionalLatency)
 | 
			
		||||
			}
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue