Streaming JSON encoder for List
Kubernetes-commit: e7c743b2ebfaed1e3132027c0369ac25b14b6f47
This commit is contained in:
parent
f2b1ab6bbc
commit
38b01a1f78
|
@ -39,8 +39,11 @@ import (
|
|||
"github.com/google/go-cmp/cmp"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
kerrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
testapigroupv1 "k8s.io/apimachinery/pkg/apis/testapigroup/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
jsonserializer "k8s.io/apimachinery/pkg/runtime/serializer/json"
|
||||
rand2 "k8s.io/apimachinery/pkg/util/rand"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
|
@ -804,3 +807,80 @@ func gzipContent(data []byte, level int) []byte {
|
|||
}
|
||||
return buf.Bytes()
|
||||
}
|
||||
|
||||
func TestStreamingGzipIntegration(t *testing.T) {
|
||||
largeChunk := bytes.Repeat([]byte("b"), defaultGzipThresholdBytes+1)
|
||||
tcs := []struct {
|
||||
name string
|
||||
serializer runtime.Encoder
|
||||
object runtime.Object
|
||||
expectGzip bool
|
||||
expectStreaming bool
|
||||
}{
|
||||
{
|
||||
name: "JSON, small object, default -> no gzip",
|
||||
serializer: jsonserializer.NewSerializerWithOptions(jsonserializer.DefaultMetaFactory, nil, nil, jsonserializer.SerializerOptions{}),
|
||||
object: &testapigroupv1.CarpList{},
|
||||
expectGzip: false,
|
||||
expectStreaming: false,
|
||||
},
|
||||
{
|
||||
name: "JSON, small object, streaming -> no gzip",
|
||||
serializer: jsonserializer.NewSerializerWithOptions(jsonserializer.DefaultMetaFactory, nil, nil, jsonserializer.SerializerOptions{StreamingCollectionsEncoding: true}),
|
||||
object: &testapigroupv1.CarpList{},
|
||||
expectGzip: false,
|
||||
expectStreaming: true,
|
||||
},
|
||||
{
|
||||
name: "JSON, large object, default -> gzip",
|
||||
serializer: jsonserializer.NewSerializerWithOptions(jsonserializer.DefaultMetaFactory, nil, nil, jsonserializer.SerializerOptions{}),
|
||||
object: &testapigroupv1.CarpList{TypeMeta: metav1.TypeMeta{Kind: string(largeChunk)}},
|
||||
expectGzip: true,
|
||||
expectStreaming: false,
|
||||
},
|
||||
{
|
||||
name: "JSON, large object, streaming -> gzip",
|
||||
serializer: jsonserializer.NewSerializerWithOptions(jsonserializer.DefaultMetaFactory, nil, nil, jsonserializer.SerializerOptions{StreamingCollectionsEncoding: true}),
|
||||
object: &testapigroupv1.CarpList{TypeMeta: metav1.TypeMeta{Kind: string(largeChunk)}},
|
||||
expectGzip: true,
|
||||
expectStreaming: true,
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
mockResponseWriter := httptest.NewRecorder()
|
||||
drw := &deferredResponseWriter{
|
||||
mediaType: "text/plain",
|
||||
statusCode: 200,
|
||||
contentEncoding: "gzip",
|
||||
hw: mockResponseWriter,
|
||||
ctx: context.Background(),
|
||||
}
|
||||
counter := &writeCounter{Writer: drw}
|
||||
err := tc.serializer.Encode(tc.object, counter)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
encoding := mockResponseWriter.Header().Get("Content-Encoding")
|
||||
if (encoding == "gzip") != tc.expectGzip {
|
||||
t.Errorf("Expect gzip: %v, got: %q", tc.expectGzip, encoding)
|
||||
}
|
||||
if counter.writeCount < 1 {
|
||||
t.Fatalf("Expect at least 1 write")
|
||||
}
|
||||
if (counter.writeCount > 1) != tc.expectStreaming {
|
||||
t.Errorf("Expect streaming: %v, got write count: %d", tc.expectStreaming, counter.writeCount)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type writeCounter struct {
|
||||
writeCount int
|
||||
io.Writer
|
||||
}
|
||||
|
||||
func (b *writeCounter) Write(data []byte) (int, error) {
|
||||
b.writeCount++
|
||||
return b.Writer.Write(data)
|
||||
}
|
||||
|
|
|
@ -217,6 +217,10 @@ const (
|
|||
// document.
|
||||
StorageVersionHash featuregate.Feature = "StorageVersionHash"
|
||||
|
||||
// owner: @serathius
|
||||
// Allow API server to encode collections item by item, instead of all at once.
|
||||
StreamingCollectionEncodingToJSON featuregate.Feature = "StreamingCollectionEncodingToJSON"
|
||||
|
||||
// owner: @aramase, @enj, @nabokihms
|
||||
// kep: https://kep.k8s.io/3331
|
||||
//
|
||||
|
@ -387,6 +391,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
|
|||
{Version: version.MustParse("1.15"), Default: true, PreRelease: featuregate.Beta},
|
||||
},
|
||||
|
||||
StreamingCollectionEncodingToJSON: {
|
||||
{Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.Beta},
|
||||
},
|
||||
|
||||
StrictCostEnforcementForVAP: {
|
||||
{Version: version.MustParse("1.30"), Default: false, PreRelease: featuregate.Beta},
|
||||
{Version: version.MustParse("1.32"), Default: true, PreRelease: featuregate.GA, LockToDefault: true},
|
||||
|
|
|
@ -991,8 +991,15 @@ func (s *GenericAPIServer) newAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupV
|
|||
// NewDefaultAPIGroupInfo returns an APIGroupInfo stubbed with "normal" values
|
||||
// exposed for easier composition from other packages
|
||||
func NewDefaultAPIGroupInfo(group string, scheme *runtime.Scheme, parameterCodec runtime.ParameterCodec, codecs serializer.CodecFactory) APIGroupInfo {
|
||||
opts := []serializer.CodecFactoryOptionsMutator{}
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.CBORServingAndStorage) {
|
||||
codecs = serializer.NewCodecFactory(scheme, serializer.WithSerializer(cbor.NewSerializerInfo))
|
||||
opts = append(opts, serializer.WithSerializer(cbor.NewSerializerInfo))
|
||||
}
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) {
|
||||
opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON())
|
||||
}
|
||||
if len(opts) != 0 {
|
||||
codecs = serializer.NewCodecFactory(scheme, opts...)
|
||||
}
|
||||
return APIGroupInfo{
|
||||
PrioritizedVersions: scheme.PrioritizedVersionsForGroup(group),
|
||||
|
|
Loading…
Reference in New Issue