diff --git a/pkg/endpoints/handlers/proxy.go b/pkg/endpoints/handlers/proxy.go index f8c1a60e9..c774e38bd 100644 --- a/pkg/endpoints/handlers/proxy.go +++ b/pkg/endpoints/handlers/proxy.go @@ -53,18 +53,22 @@ type ProxyHandler struct { } func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + reqStart := time.Now() proxyHandlerTraceID := rand.Int63() - var verb string - var apiResource, subresource string + var verb, apiResource, subresource, scope string var httpCode int - reqStart := time.Now() + defer func() { + responseLength := 0 + if rw, ok := w.(*metrics.ResponseWriterDelegator); ok { + responseLength = rw.ContentLength() + } metrics.Monitor( - verb, apiResource, subresource, + verb, apiResource, subresource, scope, net.GetHTTPClient(req), w.Header().Get("Content-Type"), - httpCode, reqStart, + httpCode, responseLength, reqStart, ) }() @@ -88,6 +92,10 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { } verb = requestInfo.Verb namespace, resource, subresource, parts := requestInfo.Namespace, requestInfo.Resource, requestInfo.Subresource, requestInfo.Parts + scope = "cluster" + if namespace != "" { + scope = "namespace" + } ctx = request.WithNamespace(ctx, namespace) if len(parts) < 2 { diff --git a/pkg/endpoints/installer.go b/pkg/endpoints/installer.go index d740336f0..b8f6a0386 100644 --- a/pkg/endpoints/installer.go +++ b/pkg/endpoints/installer.go @@ -567,6 +567,12 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag namespaced = "" } + // This variable is calculated for the purpose of instrumentation. + namespaceScope := "cluster" + if namespaced != "" { + namespaceScope = "namespace" + } + if kubeVerb, found := toDiscoveryKubeVerb[action.Verb]; found { if len(kubeVerb) != 0 { kubeVerbs[kubeVerb] = struct{}{} @@ -593,7 +599,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag } else { handler = restfulGetResource(getter, exporter, reqScope) } - handler = metrics.InstrumentRouteFunc(action.Verb, resource, subresource, handler) + handler = metrics.InstrumentRouteFunc(action.Verb, resource, subresource, namespaceScope, handler) if a.enableAPIResponseCompression { handler = genericfilters.RestfulWithCompression(handler, a.group.Context) } @@ -625,7 +631,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if hasSubresource { doc = "list " + subresource + " of objects of kind " + kind } - handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, restfulListResource(lister, watcher, reqScope, false, a.minRequestTimeout)) + handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, namespaceScope, restfulListResource(lister, watcher, reqScope, false, a.minRequestTimeout)) if a.enableAPIResponseCompression { handler = genericfilters.RestfulWithCompression(handler, a.group.Context) } @@ -660,7 +666,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if hasSubresource { doc = "replace " + subresource + " of the specified " + kind } - handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, restfulUpdateResource(updater, reqScope, a.group.Typer, admit)) + handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, namespaceScope, restfulUpdateResource(updater, reqScope, a.group.Typer, admit)) route := ws.PUT(action.Path).To(handler). Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). @@ -676,7 +682,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if hasSubresource { doc = "partially update " + subresource + " of the specified " + kind } - handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, restfulPatchResource(patcher, reqScope, admit, mapping.ObjectConvertor)) + handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, namespaceScope, restfulPatchResource(patcher, reqScope, admit, mapping.ObjectConvertor)) route := ws.PATCH(action.Path).To(handler). Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). @@ -695,7 +701,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag } else { handler = restfulCreateResource(creater, reqScope, a.group.Typer, admit) } - handler = metrics.InstrumentRouteFunc(action.Verb, resource, subresource, handler) + handler = metrics.InstrumentRouteFunc(action.Verb, resource, subresource, namespaceScope, handler) article := getArticleForNoun(kind, " ") doc := "create" + article + kind if hasSubresource { @@ -717,7 +723,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if hasSubresource { doc = "delete " + subresource + " of" + article + kind } - handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, restfulDeleteResource(gracefulDeleter, isGracefulDeleter, reqScope, admit)) + handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, namespaceScope, restfulDeleteResource(gracefulDeleter, isGracefulDeleter, reqScope, admit)) route := ws.DELETE(action.Path).To(handler). Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). @@ -738,7 +744,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if hasSubresource { doc = "delete collection of " + subresource + " of a " + kind } - handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, restfulDeleteCollection(collectionDeleter, isCollectionDeleter, reqScope, admit)) + handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, namespaceScope, restfulDeleteCollection(collectionDeleter, isCollectionDeleter, reqScope, admit)) route := ws.DELETE(action.Path).To(handler). Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). @@ -757,7 +763,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if hasSubresource { doc = "watch changes to " + subresource + " of an object of kind " + kind } - handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, restfulListResource(lister, watcher, reqScope, true, a.minRequestTimeout)) + handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, namespaceScope, restfulListResource(lister, watcher, reqScope, true, a.minRequestTimeout)) route := ws.GET(action.Path).To(handler). Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). @@ -776,7 +782,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if hasSubresource { doc = "watch individual changes to a list of " + subresource + " of " + kind } - handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, restfulListResource(lister, watcher, reqScope, true, a.minRequestTimeout)) + handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, namespaceScope, restfulListResource(lister, watcher, reqScope, true, a.minRequestTimeout)) route := ws.GET(action.Path).To(handler). Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). @@ -794,20 +800,20 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag // TODO: DEPRECATED in v1.2. case "PROXY": // Proxy requests to a resource. // Accept all methods as per http://issue.k8s.io/3996 - routes = append(routes, buildProxyRoute(ws, "GET", a.prefix, action.Path, proxyHandler, namespaced, kind, resource, subresource, hasSubresource, action.Params, operationSuffix)) - routes = append(routes, buildProxyRoute(ws, "PUT", a.prefix, action.Path, proxyHandler, namespaced, kind, resource, subresource, hasSubresource, action.Params, operationSuffix)) - routes = append(routes, buildProxyRoute(ws, "POST", a.prefix, action.Path, proxyHandler, namespaced, kind, resource, subresource, hasSubresource, action.Params, operationSuffix)) - routes = append(routes, buildProxyRoute(ws, "PATCH", a.prefix, action.Path, proxyHandler, namespaced, kind, resource, subresource, hasSubresource, action.Params, operationSuffix)) - routes = append(routes, buildProxyRoute(ws, "DELETE", a.prefix, action.Path, proxyHandler, namespaced, kind, resource, subresource, hasSubresource, action.Params, operationSuffix)) - routes = append(routes, buildProxyRoute(ws, "HEAD", a.prefix, action.Path, proxyHandler, namespaced, kind, resource, subresource, hasSubresource, action.Params, operationSuffix)) - routes = append(routes, buildProxyRoute(ws, "OPTIONS", a.prefix, action.Path, proxyHandler, namespaced, kind, resource, subresource, hasSubresource, action.Params, operationSuffix)) + routes = append(routes, buildProxyRoute(ws, "GET", a.prefix, action.Path, kind, resource, subresource, namespaced, namespaceScope, hasSubresource, action.Params, proxyHandler, operationSuffix)) + routes = append(routes, buildProxyRoute(ws, "PUT", a.prefix, action.Path, kind, resource, subresource, namespaced, namespaceScope, hasSubresource, action.Params, proxyHandler, operationSuffix)) + routes = append(routes, buildProxyRoute(ws, "POST", a.prefix, action.Path, kind, resource, subresource, namespaced, namespaceScope, hasSubresource, action.Params, proxyHandler, operationSuffix)) + routes = append(routes, buildProxyRoute(ws, "PATCH", a.prefix, action.Path, kind, resource, subresource, namespaced, namespaceScope, hasSubresource, action.Params, proxyHandler, operationSuffix)) + routes = append(routes, buildProxyRoute(ws, "DELETE", a.prefix, action.Path, kind, resource, subresource, namespaced, namespaceScope, hasSubresource, action.Params, proxyHandler, operationSuffix)) + routes = append(routes, buildProxyRoute(ws, "HEAD", a.prefix, action.Path, kind, resource, subresource, namespaced, namespaceScope, hasSubresource, action.Params, proxyHandler, operationSuffix)) + routes = append(routes, buildProxyRoute(ws, "OPTIONS", a.prefix, action.Path, kind, resource, subresource, namespaced, namespaceScope, hasSubresource, action.Params, proxyHandler, operationSuffix)) case "CONNECT": for _, method := range connecter.ConnectMethods() { doc := "connect " + method + " requests to " + kind if hasSubresource { doc = "connect " + method + " requests to " + subresource + " of " + kind } - handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, restfulConnectResource(connecter, reqScope, admit, path, hasSubresource)) + handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, namespaceScope, restfulConnectResource(connecter, reqScope, admit, path, hasSubresource)) route := ws.Method(method).Path(action.Path). To(handler). Doc(doc). @@ -868,12 +874,17 @@ func routeFunction(handler http.Handler) restful.RouteFunction { } } -func buildProxyRoute(ws *restful.WebService, method string, prefix string, path string, proxyHandler http.Handler, namespaced, kind, resource, subresource string, hasSubresource bool, params []*restful.Parameter, operationSuffix string) *restful.RouteBuilder { +func buildProxyRoute(ws *restful.WebService, + method, prefix, path, kind, resource, subresource, namespaced, namespaceScope string, + hasSubresource bool, + params []*restful.Parameter, + proxyHandler http.Handler, + operationSuffix string) *restful.RouteBuilder { doc := "proxy " + method + " requests to " + kind if hasSubresource { doc = "proxy " + method + " requests to " + subresource + " of " + kind } - handler := metrics.InstrumentRouteFunc("PROXY", resource, subresource, routeFunction(proxyHandler)) + handler := metrics.InstrumentRouteFunc("PROXY", resource, subresource, namespaceScope, routeFunction(proxyHandler)) proxyRoute := ws.Method(method).Path(path).To(handler). Doc(doc). Operation("proxy" + strings.Title(method) + namespaced + kind + strings.Title(subresource) + operationSuffix). diff --git a/pkg/endpoints/metrics/metrics.go b/pkg/endpoints/metrics/metrics.go index d26d94d24..a6bf6a862 100644 --- a/pkg/endpoints/metrics/metrics.go +++ b/pkg/endpoints/metrics/metrics.go @@ -18,6 +18,7 @@ package metrics import ( "bufio" + //"fmt" "net" "net/http" "regexp" @@ -26,6 +27,7 @@ import ( "time" utilnet "k8s.io/apimachinery/pkg/util/net" + //utilruntime "k8s.io/apimachinery/pkg/util/runtime" "github.com/emicklei/go-restful" "github.com/prometheus/client_golang/prometheus" @@ -44,7 +46,7 @@ var ( requestLatencies = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Name: "apiserver_request_latencies", - Help: "Response latency distribution in microseconds for each verb, resource and client.", + Help: "Response latency distribution in microseconds for each verb, resource and subresource.", // Use buckets ranging from 125 ms to 8 seconds. Buckets: prometheus.ExponentialBuckets(125000, 2.0, 7), }, @@ -53,12 +55,21 @@ var ( requestLatenciesSummary = prometheus.NewSummaryVec( prometheus.SummaryOpts{ Name: "apiserver_request_latencies_summary", - Help: "Response latency summary in microseconds for each verb and resource.", + Help: "Response latency summary in microseconds for each verb, resource and subresource.", // Make the sliding window of 1h. MaxAge: time.Hour, }, []string{"verb", "resource", "subresource"}, ) + responseSizes = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "apiserver_response_sizes", + Help: "Response size distribution in bytes for each verb, resource, subresource and scope (namespace/cluster).", + // Use buckets ranging from 1000 bytes (1KB) to 10^9 bytes (1GB). + Buckets: prometheus.ExponentialBuckets(1000, 10.0, 7), + }, + []string{"verb", "resource", "subresource", "scope"}, + ) kubectlExeRegexp = regexp.MustCompile(`^.*((?i:kubectl\.exe))`) ) @@ -67,20 +78,25 @@ func Register() { prometheus.MustRegister(requestCounter) prometheus.MustRegister(requestLatencies) prometheus.MustRegister(requestLatenciesSummary) + prometheus.MustRegister(responseSizes) } // Monitor records a request to the apiserver endpoints that follow the Kubernetes API conventions. verb must be // uppercase to be backwards compatible with existing monitoring tooling. -func Monitor(verb, resource, subresource, client, contentType string, httpCode int, reqStart time.Time) { +func Monitor(verb, resource, subresource, scope, client, contentType string, httpCode, respSize int, reqStart time.Time) { elapsed := float64((time.Since(reqStart)) / time.Microsecond) requestCounter.WithLabelValues(verb, resource, subresource, client, contentType, codeToString(httpCode)).Inc() requestLatencies.WithLabelValues(verb, resource, subresource).Observe(elapsed) requestLatenciesSummary.WithLabelValues(verb, resource, subresource).Observe(elapsed) + // We are only interested in response sizes of read requests. + if verb == "GET" || verb == "LIST" { + responseSizes.WithLabelValues(verb, resource, subresource, scope).Observe(float64(respSize)) + } } // MonitorRequest handles standard transformations for client and the reported verb and then invokes Monitor to record // a request. verb must be uppercase to be backwards compatible with existing monitoring tooling. -func MonitorRequest(request *http.Request, verb, resource, subresource, contentType string, httpCode int, reqStart time.Time) { +func MonitorRequest(request *http.Request, verb, resource, subresource, scope, contentType string, httpCode, respSize int, reqStart time.Time) { reportedVerb := verb if verb == "LIST" { // see apimachinery/pkg/runtime/conversion.go Convert_Slice_string_To_bool @@ -90,23 +106,25 @@ func MonitorRequest(request *http.Request, verb, resource, subresource, contentT } } } + client := cleanUserAgent(utilnet.GetHTTPClient(request)) - Monitor(reportedVerb, resource, subresource, client, contentType, httpCode, reqStart) + Monitor(reportedVerb, resource, subresource, scope, client, contentType, httpCode, respSize, reqStart) } func Reset() { requestCounter.Reset() requestLatencies.Reset() requestLatenciesSummary.Reset() + responseSizes.Reset() } // InstrumentRouteFunc works like Prometheus' InstrumentHandlerFunc but wraps // the go-restful RouteFunction instead of a HandlerFunc -func InstrumentRouteFunc(verb, resource, subresource string, routeFunc restful.RouteFunction) restful.RouteFunction { +func InstrumentRouteFunc(verb, resource, subresource, scope string, routeFunc restful.RouteFunction) restful.RouteFunction { return restful.RouteFunction(func(request *restful.Request, response *restful.Response) { now := time.Now() - delegate := &responseWriterDelegator{ResponseWriter: response.ResponseWriter} + delegate := &ResponseWriterDelegator{ResponseWriter: response.ResponseWriter} _, cn := response.ResponseWriter.(http.CloseNotifier) _, fl := response.ResponseWriter.(http.Flusher) @@ -121,7 +139,7 @@ func InstrumentRouteFunc(verb, resource, subresource string, routeFunc restful.R routeFunc(request, response) - MonitorRequest(request.Request, verb, resource, subresource, rw.Header().Get("Content-Type"), delegate.status, now) + MonitorRequest(request.Request, verb, resource, subresource, scope, delegate.Header().Get("Content-Type"), delegate.Status(), delegate.ContentLength(), now) }) } @@ -135,7 +153,8 @@ func cleanUserAgent(ua string) string { return ua } -type responseWriterDelegator struct { +// ResponseWriterDelegator interface wraps http.ResponseWriter to additionally record content-length, status-code, etc. +type ResponseWriterDelegator struct { http.ResponseWriter status int @@ -143,13 +162,13 @@ type responseWriterDelegator struct { wroteHeader bool } -func (r *responseWriterDelegator) WriteHeader(code int) { +func (r *ResponseWriterDelegator) WriteHeader(code int) { r.status = code r.wroteHeader = true r.ResponseWriter.WriteHeader(code) } -func (r *responseWriterDelegator) Write(b []byte) (int, error) { +func (r *ResponseWriterDelegator) Write(b []byte) (int, error) { if !r.wroteHeader { r.WriteHeader(http.StatusOK) } @@ -158,8 +177,16 @@ func (r *responseWriterDelegator) Write(b []byte) (int, error) { return n, err } +func (r *ResponseWriterDelegator) Status() int { + return r.status +} + +func (r *ResponseWriterDelegator) ContentLength() int { + return int(r.written) +} + type fancyResponseWriterDelegator struct { - *responseWriterDelegator + *ResponseWriterDelegator } func (f *fancyResponseWriterDelegator) CloseNotify() <-chan bool { diff --git a/pkg/server/filters/maxinflight.go b/pkg/server/filters/maxinflight.go index 960d400d7..3e159c21f 100644 --- a/pkg/server/filters/maxinflight.go +++ b/pkg/server/filters/maxinflight.go @@ -109,7 +109,11 @@ func WithMaxInFlightLimit( } } } - metrics.MonitorRequest(r, strings.ToUpper(requestInfo.Verb), requestInfo.Resource, requestInfo.Subresource, "", errors.StatusTooManyRequests, time.Now()) + scope := "cluster" + if requestInfo.Namespace != "" { + scope = "namespace" + } + metrics.MonitorRequest(r, strings.ToUpper(requestInfo.Verb), requestInfo.Resource, requestInfo.Subresource, "", scope, errors.StatusTooManyRequests, 0, time.Now()) tooManyRequests(r, w) } } diff --git a/pkg/server/filters/timeout.go b/pkg/server/filters/timeout.go index 341670bc9..e3e21a5f0 100644 --- a/pkg/server/filters/timeout.go +++ b/pkg/server/filters/timeout.go @@ -60,7 +60,11 @@ func WithTimeoutForNonLongRunningRequests(handler http.Handler, requestContextMa } now := time.Now() metricFn := func() { - metrics.MonitorRequest(req, strings.ToUpper(requestInfo.Verb), requestInfo.Resource, requestInfo.Subresource, "", http.StatusInternalServerError, now) + scope := "cluster" + if requestInfo.Namespace != "" { + scope = "namespace" + } + metrics.MonitorRequest(req, strings.ToUpper(requestInfo.Verb), requestInfo.Resource, requestInfo.Subresource, "", scope, http.StatusInternalServerError, 0, now) } return time.After(globalTimeout), metricFn, apierrors.NewServerTimeout(schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}, requestInfo.Verb, 0) }