add compression to GET and LIST api requests

this feature is gated; disabled by default

Kubernetes-commit: c305f72315a83c16c40fbbfd06b563f9e67208ff
This commit is contained in:
Scott Weiss 2017-06-14 10:45:06 -04:00 committed by Kubernetes Publisher
parent a238a912d3
commit b74e5942e2
10 changed files with 534 additions and 22 deletions

View File

@ -55,6 +55,7 @@ go_test(
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/testing:go_default_library",
"//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/filters:go_default_library",
],
)
@ -84,5 +85,6 @@ go_library(
"//vendor/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/filters:go_default_library",
],
)

View File

@ -18,6 +18,7 @@ package endpoints
import (
"bytes"
"compress/gzip"
"encoding/json"
"errors"
"fmt"
@ -66,6 +67,7 @@ import (
"k8s.io/apiserver/pkg/endpoints/request"
genericapitesting "k8s.io/apiserver/pkg/endpoints/testing"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/server/filters"
)
// alwaysAdmit is an implementation of admission.Interface which always says yes to an admit request.
@ -1205,6 +1207,110 @@ func TestRequestsWithInvalidQuery(t *testing.T) {
}
}
func TestListCompression(t *testing.T) {
testCases := []struct {
url string
namespace string
selfLink string
legacy bool
label string
field string
acceptEncoding string
}{
// list items in a namespace in the path
{
url: "/" + grouplessPrefix + "/" + grouplessGroupVersion.Version + "/namespaces/default/simple",
namespace: "default",
selfLink: "/" + grouplessPrefix + "/" + grouplessGroupVersion.Version + "/namespaces/default/simple",
acceptEncoding: "",
},
{
url: "/" + grouplessPrefix + "/" + grouplessGroupVersion.Version + "/namespaces/default/simple",
namespace: "default",
selfLink: "/" + grouplessPrefix + "/" + grouplessGroupVersion.Version + "/namespaces/default/simple",
acceptEncoding: "gzip",
},
}
for i, testCase := range testCases {
storage := map[string]rest.Storage{}
simpleStorage := SimpleRESTStorage{expectedResourceNamespace: testCase.namespace}
storage["simple"] = &simpleStorage
selfLinker := &setTestSelfLinker{
t: t,
namespace: testCase.namespace,
expectedSet: testCase.selfLink,
}
var handler = handleInternal(storage, admissionControl, selfLinker, nil)
requestContextMapper = request.NewRequestContextMapper()
handler = filters.WithCompression(handler, requestContextMapper)
handler = genericapifilters.WithRequestInfo(handler, newTestRequestInfoResolver(), requestContextMapper)
handler = request.WithRequestContext(handler, requestContextMapper)
server := httptest.NewServer(handler)
defer server.Close()
req, err := http.NewRequest("GET", server.URL+testCase.url, nil)
if err != nil {
t.Errorf("%d: unexpected error: %v", i, err)
continue
}
// It's necessary to manually set Accept-Encoding here
// to prevent http.DefaultClient from automatically
// decoding responses
req.Header.Set("Accept-Encoding", testCase.acceptEncoding)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Errorf("%d: unexpected error: %v", i, err)
continue
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Errorf("%d: unexpected status: %d from url %s, Expected: %d, %#v", i, resp.StatusCode, testCase.url, http.StatusOK, resp)
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Errorf("%d: unexpected error: %v", i, err)
continue
}
t.Logf("%d: body: %s", i, string(body))
continue
}
// TODO: future, restore get links
if !selfLinker.called {
t.Errorf("%d: never set self link", i)
}
if !simpleStorage.namespacePresent {
t.Errorf("%d: namespace not set", i)
} else if simpleStorage.actualNamespace != testCase.namespace {
t.Errorf("%d: %q unexpected resource namespace: %s", i, testCase.url, simpleStorage.actualNamespace)
}
if simpleStorage.requestedLabelSelector == nil || simpleStorage.requestedLabelSelector.String() != testCase.label {
t.Errorf("%d: unexpected label selector: %v", i, simpleStorage.requestedLabelSelector)
}
if simpleStorage.requestedFieldSelector == nil || simpleStorage.requestedFieldSelector.String() != testCase.field {
t.Errorf("%d: unexpected field selector: %v", i, simpleStorage.requestedFieldSelector)
}
var decoder *json.Decoder
if testCase.acceptEncoding == "gzip" {
gzipReader, err := gzip.NewReader(resp.Body)
if err != nil {
t.Fatalf("unexpected error creating gzip reader: %v", err)
}
decoder = json.NewDecoder(gzipReader)
} else {
decoder = json.NewDecoder(resp.Body)
}
var itemOut genericapitesting.SimpleList
err = decoder.Decode(&itemOut)
if err != nil {
t.Errorf("failed to read response body as SimpleList: %v", err)
}
}
}
func TestLogs(t *testing.T) {
handler := handle(map[string]rest.Storage{})
server := httptest.NewServer(handler)
@ -1520,6 +1626,82 @@ func TestGet(t *testing.T) {
}
}
func TestGetCompression(t *testing.T) {
storage := map[string]rest.Storage{}
simpleStorage := SimpleRESTStorage{
item: genericapitesting.Simple{
Other: "foo",
},
}
selfLinker := &setTestSelfLinker{
t: t,
expectedSet: "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple/id",
name: "id",
namespace: "default",
}
requestContextMapper = request.NewRequestContextMapper()
storage["simple"] = &simpleStorage
handler := handleLinker(storage, selfLinker)
handler = filters.WithCompression(handler, requestContextMapper)
handler = genericapifilters.WithRequestInfo(handler, newTestRequestInfoResolver(), requestContextMapper)
handler = request.WithRequestContext(handler, requestContextMapper)
server := httptest.NewServer(handler)
defer server.Close()
tests := []struct {
acceptEncoding string
}{
{acceptEncoding: ""},
{acceptEncoding: "gzip"},
}
for _, test := range tests {
req, err := http.NewRequest("GET", server.URL+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/default/simple/id", nil)
if err != nil {
t.Fatalf("unexpected error cretaing request: %v", err)
}
// It's necessary to manually set Accept-Encoding here
// to prevent http.DefaultClient from automatically
// decoding responses
req.Header.Set("Accept-Encoding", test.acceptEncoding)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if resp.StatusCode != http.StatusOK {
t.Fatalf("unexpected response: %#v", resp)
}
var decoder *json.Decoder
if test.acceptEncoding == "gzip" {
gzipReader, err := gzip.NewReader(resp.Body)
if err != nil {
t.Fatalf("unexpected error creating gzip reader: %v", err)
}
decoder = json.NewDecoder(gzipReader)
} else {
decoder = json.NewDecoder(resp.Body)
}
var itemOut genericapitesting.Simple
err = decoder.Decode(&itemOut)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Errorf("unexpected error reading body: %v", err)
}
if itemOut.Name != simpleStorage.item.Name {
t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simpleStorage.item, string(body))
}
if !selfLinker.called {
t.Errorf("Never set self link")
}
}
}
func TestGetUninitialized(t *testing.T) {
storage := map[string]rest.Storage{}
simpleStorage := SimpleRESTStorage{

View File

@ -87,6 +87,10 @@ type APIGroupVersion struct {
// ResourceLister is an interface that knows how to list resources
// for this API Group.
ResourceLister discovery.APIResourceLister
// EnableAPIResponseCompression indicates whether API Responses should support compression
// if the client requests it via Accept-Encoding
EnableAPIResponseCompression bool
}
// InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container.
@ -138,9 +142,10 @@ func (g *APIGroupVersion) UpdateREST(container *restful.Container) error {
func (g *APIGroupVersion) newInstaller() *APIInstaller {
prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
installer := &APIInstaller{
group: g,
prefix: prefix,
minRequestTimeout: g.MinRequestTimeout,
group: g,
prefix: prefix,
minRequestTimeout: g.MinRequestTimeout,
enableAPIResponseCompression: g.EnableAPIResponseCompression,
}
return installer
}

View File

@ -40,6 +40,7 @@ import (
"k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
genericfilters "k8s.io/apiserver/pkg/server/filters"
)
const (
@ -48,9 +49,10 @@ const (
)
type APIInstaller struct {
group *APIGroupVersion
prefix string // Path prefix where API resources are to be registered.
minRequestTimeout time.Duration
group *APIGroupVersion
prefix string // Path prefix where API resources are to be registered.
minRequestTimeout time.Duration
enableAPIResponseCompression bool
}
// Struct capturing information about an action ("GET", "POST", "WATCH", "PROXY", etc).
@ -584,6 +586,9 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
handler = restfulGetResource(getter, exporter, reqScope)
}
handler = metrics.InstrumentRouteFunc(action.Verb, resource, subresource, handler)
if a.enableAPIResponseCompression {
handler = genericfilters.RestfulWithCompression(handler, a.group.Context)
}
doc := "read the specified " + kind
if hasSubresource {
doc = "read " + subresource + " of the specified " + kind
@ -613,6 +618,9 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
doc = "list " + subresource + " of objects of kind " + kind
}
handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, restfulListResource(lister, watcher, reqScope, false, a.minRequestTimeout))
if a.enableAPIResponseCompression {
handler = genericfilters.RestfulWithCompression(handler, a.group.Context)
}
route := ws.GET(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).

View File

@ -41,6 +41,12 @@ const (
// pluggable output backends and an audit policy specifying how different requests should be
// audited.
AdvancedAuditing utilfeature.Feature = "AdvancedAuditing"
// owner: @ilackams
// alpha: v1.7
//
// Enables compression of REST responses (GET and LIST only)
APIResponseCompression utilfeature.Feature = "APIResponseCompression"
)
func init() {
@ -53,4 +59,5 @@ func init() {
var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureSpec{
StreamingProxyRedirects: {Default: true, PreRelease: utilfeature.Beta},
AdvancedAuditing: {Default: false, PreRelease: utilfeature.Alpha},
APIResponseCompression: {Default: false, PreRelease: utilfeature.Alpha},
}

View File

@ -160,6 +160,10 @@ type Config struct {
// Predicate which is true for paths of long-running http requests
LongRunningFunc apirequest.LongRunningRequestCheck
// EnableAPIResponseCompression indicates whether API Responses should support compression
// if the client requests it via Accept-Encoding
EnableAPIResponseCompression bool
//===========================================================================
// values below here are targets for removal
//===========================================================================
@ -206,19 +210,20 @@ type SecureServingInfo struct {
// NewConfig returns a Config struct with the default values
func NewConfig(codecs serializer.CodecFactory) *Config {
return &Config{
Serializer: codecs,
ReadWritePort: 443,
RequestContextMapper: apirequest.NewRequestContextMapper(),
BuildHandlerChainFunc: DefaultBuildHandlerChain,
LegacyAPIGroupPrefixes: sets.NewString(DefaultLegacyAPIPrefix),
DisabledPostStartHooks: sets.NewString(),
HealthzChecks: []healthz.HealthzChecker{healthz.PingHealthz},
EnableIndex: true,
EnableDiscovery: true,
EnableProfiling: true,
MaxRequestsInFlight: 400,
MaxMutatingRequestsInFlight: 200,
MinRequestTimeout: 1800,
Serializer: codecs,
ReadWritePort: 443,
RequestContextMapper: apirequest.NewRequestContextMapper(),
BuildHandlerChainFunc: DefaultBuildHandlerChain,
LegacyAPIGroupPrefixes: sets.NewString(DefaultLegacyAPIPrefix),
DisabledPostStartHooks: sets.NewString(),
HealthzChecks: []healthz.HealthzChecker{healthz.PingHealthz},
EnableIndex: true,
EnableDiscovery: true,
EnableProfiling: true,
MaxRequestsInFlight: 400,
MaxMutatingRequestsInFlight: 200,
MinRequestTimeout: 1800,
EnableAPIResponseCompression: utilfeature.DefaultFeatureGate.Enabled(features.APIResponseCompression),
// Default to treating watch as a long-running operation
// Generic API servers have no inherent long-running subresources
@ -412,6 +417,8 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
healthzChecks: c.HealthzChecks,
DiscoveryGroupManager: discovery.NewRootAPIsHandler(c.DiscoveryAddresses, c.Serializer, c.RequestContextMapper),
enableAPIResponseCompression: c.EnableAPIResponseCompression,
}
for k, v := range delegationTarget.PostStartHooks() {

View File

@ -11,6 +11,7 @@ load(
go_test(
name = "go_default_test",
srcs = [
"compression_test.go",
"cors_test.go",
"maxinflight_test.go",
"timeout_test.go",
@ -31,6 +32,7 @@ go_test(
go_library(
name = "go_default_library",
srcs = [
"compression.go",
"cors.go",
"doc.go",
"longrunning.go",
@ -40,6 +42,7 @@ go_library(
],
tags = ["automanaged"],
deps = [
"//vendor/github.com/emicklei/go-restful:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",

View File

@ -0,0 +1,183 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filters
import (
"compress/gzip"
"compress/zlib"
"errors"
"fmt"
"io"
"net/http"
"strings"
"github.com/emicklei/go-restful"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/endpoints/request"
)
// Compressor is an interface to compression writers
type Compressor interface {
io.WriteCloser
Flush() error
}
const (
headerAcceptEncoding = "Accept-Encoding"
headerContentEncoding = "Content-Encoding"
encodingGzip = "gzip"
encodingDeflate = "deflate"
)
// WithCompression wraps an http.Handler with the Compression Handler
func WithCompression(handler http.Handler, ctxMapper request.RequestContextMapper) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
wantsCompression, encoding := wantsCompressedResponse(req, ctxMapper)
w.Header().Set("Vary", "Accept-Encoding")
if wantsCompression {
compressionWriter, err := NewCompressionResponseWriter(w, encoding)
if err != nil {
handleError(w, req, err)
runtime.HandleError(fmt.Errorf("failed to compress HTTP response: %v", err))
return
}
compressionWriter.Header().Set("Content-Encoding", encoding)
handler.ServeHTTP(compressionWriter, req)
compressionWriter.(*compressionResponseWriter).Close()
} else {
handler.ServeHTTP(w, req)
}
})
}
// wantsCompressedResponse reads the Accept-Encoding header to see if and which encoding is requested.
func wantsCompressedResponse(req *http.Request, ctxMapper request.RequestContextMapper) (bool, string) {
// don't compress watches
ctx, ok := ctxMapper.Get(req)
if !ok {
return false, ""
}
info, ok := request.RequestInfoFrom(ctx)
if !ok {
return false, ""
}
if !info.IsResourceRequest {
return false, ""
}
if info.Verb == "watch" {
return false, ""
}
header := req.Header.Get(headerAcceptEncoding)
gi := strings.Index(header, encodingGzip)
zi := strings.Index(header, encodingDeflate)
// use in order of appearance
switch {
case gi == -1:
return zi != -1, encodingDeflate
case zi == -1:
return gi != -1, encodingGzip
case gi < zi:
return true, encodingGzip
default:
return true, encodingDeflate
}
}
type compressionResponseWriter struct {
writer http.ResponseWriter
compressor Compressor
encoding string
}
// NewCompressionResponseWriter returns wraps w with a compression ResponseWriter, using the given encoding
func NewCompressionResponseWriter(w http.ResponseWriter, encoding string) (http.ResponseWriter, error) {
var compressor Compressor
switch encoding {
case encodingGzip:
compressor = gzip.NewWriter(w)
case encodingDeflate:
compressor = zlib.NewWriter(w)
default:
return nil, fmt.Errorf("%s is not a supported encoding type", encoding)
}
return &compressionResponseWriter{
writer: w,
compressor: compressor,
encoding: encoding,
}, nil
}
// compressionResponseWriter implements http.Responsewriter Interface
var _ http.ResponseWriter = &compressionResponseWriter{}
func (c *compressionResponseWriter) Header() http.Header {
return c.writer.Header()
}
// compress data according to compression method
func (c *compressionResponseWriter) Write(p []byte) (int, error) {
if c.compressorClosed() {
return -1, errors.New("compressing error: tried to write data using closed compressor")
}
c.Header().Set(headerContentEncoding, c.encoding)
return c.compressor.Write(p)
}
func (c *compressionResponseWriter) WriteHeader(status int) {
c.writer.WriteHeader(status)
}
// CloseNotify is part of http.CloseNotifier interface
func (c *compressionResponseWriter) CloseNotify() <-chan bool {
return c.writer.(http.CloseNotifier).CloseNotify()
}
// Close the underlying compressor
func (c *compressionResponseWriter) Close() error {
if c.compressorClosed() {
return errors.New("Compressing error: tried to close already closed compressor")
}
c.compressor.Close()
c.compressor = nil
return nil
}
func (c *compressionResponseWriter) Flush() {
if c.compressorClosed() {
return
}
c.compressor.Flush()
}
func (c *compressionResponseWriter) compressorClosed() bool {
return nil == c.compressor
}
// RestfulWithCompression wraps WithCompression to be compatible with go-restful
func RestfulWithCompression(function restful.RouteFunction, ctxMapper request.RequestContextMapper) restful.RouteFunction {
return restful.RouteFunction(func(request *restful.Request, response *restful.Response) {
handler := WithCompression(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
response.ResponseWriter = w
request.Request = req
function(request, response)
}), ctxMapper)
handler.ServeHTTP(response.ResponseWriter, request.Request)
})
}

View File

@ -0,0 +1,110 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filters
import (
"bytes"
"compress/gzip"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/endpoints/filters"
"k8s.io/apiserver/pkg/endpoints/request"
)
func TestCompression(t *testing.T) {
tests := []struct {
encoding string
watch bool
}{
{"", false},
{"gzip", true},
{"gzip", false},
}
responseData := []byte("1234")
requestContextMapper := request.NewRequestContextMapper()
for _, test := range tests {
handler := WithCompression(
http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.Write(responseData)
}),
requestContextMapper,
)
handler = filters.WithRequestInfo(handler, newTestRequestInfoResolver(), requestContextMapper)
handler = request.WithRequestContext(handler, requestContextMapper)
server := httptest.NewServer(handler)
defer server.Close()
client := http.Client{
Transport: &http.Transport{
DisableCompression: true,
},
}
url := server.URL + "/api/v1/pods"
if test.watch {
url = url + "?watch=1"
}
request, err := http.NewRequest("GET", url, nil)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
request.Header.Set("Accept-Encoding", test.encoding)
response, err := client.Do(request)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
var reader io.Reader
if test.encoding == "gzip" && !test.watch {
if response.Header.Get("Content-Encoding") != "gzip" {
t.Fatal("expected response header Content-Encoding to be set to \"gzip\"")
}
if response.Header.Get("Vary") != "Accept-Encoding" {
t.Fatal("expected response header Vary to be set to \"Accept-Encoding\"")
}
reader, err = gzip.NewReader(response.Body)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
} else {
if response.Header.Get("Content-Encoding") == "gzip" {
t.Fatal("expected response header Content-Encoding not to be set")
}
reader = response.Body
}
body, err := ioutil.ReadAll(reader)
if err != nil {
t.Fatal("unexpected error: %v", err)
}
if !bytes.Equal(body, responseData) {
t.Fatalf("Expected response body %s to equal %s", body, responseData)
}
}
}
func newTestRequestInfoResolver() *request.RequestInfoFactory {
return &request.RequestInfoFactory{
APIPrefixes: sets.NewString("api", "apis"),
GrouplessAPIPrefixes: sets.NewString("api"),
}
}

View File

@ -150,6 +150,10 @@ type GenericAPIServer struct {
// auditing. The backend is started after the server starts listening.
AuditBackend audit.Backend
// enableAPIResponseCompression indicates whether API Responses should support compression
// if the client requests it via Accept-Encoding
enableAPIResponseCompression bool
}
// DelegationTarget is an interface which allows for composition of API servers with top level handling that works
@ -431,9 +435,10 @@ func (s *GenericAPIServer) newAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupV
Linker: apiGroupInfo.GroupMeta.SelfLinker,
Mapper: apiGroupInfo.GroupMeta.RESTMapper,
Admit: s.admissionControl,
Context: s.RequestContextMapper(),
MinRequestTimeout: s.minRequestTimeout,
Admit: s.admissionControl,
Context: s.RequestContextMapper(),
MinRequestTimeout: s.minRequestTimeout,
EnableAPIResponseCompression: s.enableAPIResponseCompression,
}
}