Implement streaming proto encoding

Kubernetes-commit: f5dd7107f7144c4f76ca6159c1eeddb48a12feaa
This commit is contained in:
Marek Siarkowicz 2024-12-19 12:30:39 +01:00 committed by Kubernetes Publisher
parent b2bc62b37f
commit 67ec836891
4 changed files with 44 additions and 4 deletions

View File

@ -157,9 +157,9 @@ const (
// (usually the entire object), and if the size is smaller no gzipping will be performed
// if the client requests it.
defaultGzipThresholdBytes = 128 * 1024
// Use the length of the first write of streaming implementations.
// TODO: Update when streaming proto is implemented
firstWriteStreamingThresholdBytes = 1
// Use the length of the first write to recognize streaming implementations.
// When streaming JSON first write is "{", while Kubernetes protobuf starts unique 4 byte header.
firstWriteStreamingThresholdBytes = 4
)
// negotiateContentEncoding returns a supported client-requested content encoding for the

View File

@ -44,6 +44,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
jsonserializer "k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/apimachinery/pkg/runtime/serializer/protobuf"
rand2 "k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apiserver/pkg/features"
@ -845,6 +846,34 @@ func TestStreamingGzipIntegration(t *testing.T) {
expectGzip: true,
expectStreaming: true,
},
{
name: "Protobuf, small object, default -> no gzip",
serializer: protobuf.NewSerializerWithOptions(nil, nil, protobuf.SerializerOptions{}),
object: &testapigroupv1.CarpList{},
expectGzip: false,
expectStreaming: false,
},
{
name: "Protobuf, small object, streaming -> no gzip",
serializer: protobuf.NewSerializerWithOptions(nil, nil, protobuf.SerializerOptions{StreamingCollectionsEncoding: true}),
object: &testapigroupv1.CarpList{},
expectGzip: false,
expectStreaming: true,
},
{
name: "Protobuf, large object, default -> gzip",
serializer: protobuf.NewSerializerWithOptions(nil, nil, protobuf.SerializerOptions{}),
object: &testapigroupv1.CarpList{TypeMeta: metav1.TypeMeta{Kind: string(largeChunk)}},
expectGzip: true,
expectStreaming: false,
},
{
name: "Protobuf, large object, streaming -> gzip",
serializer: protobuf.NewSerializerWithOptions(nil, nil, protobuf.SerializerOptions{StreamingCollectionsEncoding: true}),
object: &testapigroupv1.CarpList{TypeMeta: metav1.TypeMeta{Kind: string(largeChunk)}},
expectGzip: true,
expectStreaming: true,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {

View File

@ -197,9 +197,13 @@ const (
StorageVersionHash featuregate.Feature = "StorageVersionHash"
// owner: @serathius
// Allow API server to encode collections item by item, instead of all at once.
// Allow API server JSON encoder to encode collections item by item, instead of all at once.
StreamingCollectionEncodingToJSON featuregate.Feature = "StreamingCollectionEncodingToJSON"
// owner: @serathius
// Allow API server Protobuf encoder to encode collections item by item, instead of all at once.
StreamingCollectionEncodingToProtobuf featuregate.Feature = "StreamingCollectionEncodingToProtobuf"
// owner: @aramase, @enj, @nabokihms
// kep: https://kep.k8s.io/3331
//
@ -356,6 +360,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
{Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.Beta},
},
StreamingCollectionEncodingToProtobuf: {
{Version: version.MustParse("1.33"), Default: true, PreRelease: featuregate.Beta},
},
StrictCostEnforcementForVAP: {
{Version: version.MustParse("1.30"), Default: false, PreRelease: featuregate.Beta},
{Version: version.MustParse("1.32"), Default: true, PreRelease: featuregate.GA, LockToDefault: true},

View File

@ -992,6 +992,9 @@ func NewDefaultAPIGroupInfo(group string, scheme *runtime.Scheme, parameterCodec
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) {
opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON())
}
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToProtobuf) {
opts = append(opts, serializer.WithStreamingCollectionEncodingToProtobuf())
}
if len(opts) != 0 {
codecs = serializer.NewCodecFactory(scheme, opts...)
}