Merge pull request #130281 from z1cheng/issue_130264

Implement chunking for gzip encoder in deferredResponseWriter

Kubernetes-commit: 25dc6c98209b50db1f0a023020003a4051b06138
This commit is contained in:
Kubernetes Publisher 2025-02-26 10:16:36 -08:00
commit 205c0f56b5
2 changed files with 215 additions and 30 deletions

View File

@ -157,6 +157,9 @@ const (
// (usually the entire object), and if the size is smaller no gzipping will be performed
// if the client requests it.
defaultGzipThresholdBytes = 128 * 1024
// Use the length of the first write of streaming implementations.
// TODO: Update when streaming proto is implemented
firstWriteStreamingThresholdBytes = 1
)
// negotiateContentEncoding returns a supported client-requested content encoding for the
@ -192,14 +195,53 @@ type deferredResponseWriter struct {
statusCode int
contentEncoding string
hasWritten bool
hw http.ResponseWriter
w io.Writer
hasBuffered bool
buffer []byte
hasWritten bool
hw http.ResponseWriter
w io.Writer
ctx context.Context
}
func (w *deferredResponseWriter) Write(p []byte) (n int, err error) {
switch {
case w.hasWritten:
// already written, cannot buffer
return w.unbufferedWrite(p)
case w.contentEncoding != "gzip":
// non-gzip, no need to buffer
return w.unbufferedWrite(p)
case !w.hasBuffered && len(p) > defaultGzipThresholdBytes:
// not yet buffered, first write is long enough to trigger gzip, no need to buffer
return w.unbufferedWrite(p)
case !w.hasBuffered && len(p) > firstWriteStreamingThresholdBytes:
// not yet buffered, first write is longer than expected for streaming scenarios that would require buffering, no need to buffer
return w.unbufferedWrite(p)
default:
if !w.hasBuffered {
w.hasBuffered = true
// Start at 80 bytes to avoid rapid reallocation of the buffer.
// The minimum size of a 0-item serialized list object is 80 bytes:
// {"kind":"List","apiVersion":"v1","metadata":{"resourceVersion":"1"},"items":[]}\n
w.buffer = make([]byte, 0, max(80, len(p)))
}
w.buffer = append(w.buffer, p...)
var err error
if len(w.buffer) > defaultGzipThresholdBytes {
// we've accumulated enough to trigger gzip, write and clear buffer
_, err = w.unbufferedWrite(w.buffer)
w.buffer = nil
}
return len(p), err
}
}
func (w *deferredResponseWriter) unbufferedWrite(p []byte) (n int, err error) {
ctx := w.ctx
span := tracing.SpanFromContext(ctx)
// This Step usually wraps in-memory object serialization.
@ -245,11 +287,17 @@ func (w *deferredResponseWriter) Write(p []byte) (n int, err error) {
return w.w.Write(p)
}
func (w *deferredResponseWriter) Close() error {
func (w *deferredResponseWriter) Close() (err error) {
if !w.hasWritten {
return nil
if !w.hasBuffered {
return nil
}
// never reached defaultGzipThresholdBytes, no need to do the gzip writer cleanup
_, err := w.unbufferedWrite(w.buffer)
w.buffer = nil
return err
}
var err error
switch t := w.w.(type) {
case *gzip.Writer:
err = t.Close()

View File

@ -33,7 +33,6 @@ import (
"os"
"reflect"
"strconv"
"strings"
"testing"
"time"
@ -42,6 +41,7 @@ import (
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
rand2 "k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
@ -378,29 +378,94 @@ func TestDeferredResponseWriter_Write(t *testing.T) {
largeChunk := bytes.Repeat([]byte("b"), defaultGzipThresholdBytes+1)
tests := []struct {
name string
chunks [][]byte
expectGzip bool
name string
chunks [][]byte
expectGzip bool
expectHeaders http.Header
}{
{
name: "no writes",
chunks: nil,
expectGzip: false,
expectHeaders: http.Header{},
},
{
name: "one empty write",
chunks: [][]byte{{}},
expectGzip: false,
expectHeaders: http.Header{
"Content-Type": []string{"text/plain"},
},
},
{
name: "one single byte write",
chunks: [][]byte{{'{'}},
expectGzip: false,
expectHeaders: http.Header{
"Content-Type": []string{"text/plain"},
},
},
{
name: "one small chunk write",
chunks: [][]byte{smallChunk},
expectGzip: false,
expectHeaders: http.Header{
"Content-Type": []string{"text/plain"},
},
},
{
name: "two small chunk writes",
chunks: [][]byte{smallChunk, smallChunk},
expectGzip: false,
expectHeaders: http.Header{
"Content-Type": []string{"text/plain"},
},
},
{
name: "one single byte and one small chunk write",
chunks: [][]byte{{'{'}, smallChunk},
expectGzip: false,
expectHeaders: http.Header{
"Content-Type": []string{"text/plain"},
},
},
{
name: "two single bytes and one small chunk write",
chunks: [][]byte{{'{'}, {'{'}, smallChunk},
expectGzip: true,
expectHeaders: http.Header{
"Content-Type": []string{"text/plain"},
"Content-Encoding": []string{"gzip"},
"Vary": []string{"Accept-Encoding"},
},
},
{
name: "one large chunk writes",
chunks: [][]byte{largeChunk},
expectGzip: true,
expectHeaders: http.Header{
"Content-Type": []string{"text/plain"},
"Content-Encoding": []string{"gzip"},
"Vary": []string{"Accept-Encoding"},
},
},
{
name: "two large chunk writes",
chunks: [][]byte{largeChunk, largeChunk},
expectGzip: true,
expectHeaders: http.Header{
"Content-Type": []string{"text/plain"},
"Content-Encoding": []string{"gzip"},
"Vary": []string{"Accept-Encoding"},
},
},
{
name: "one small chunk and one large chunk write",
chunks: [][]byte{smallChunk, largeChunk},
expectGzip: false,
expectHeaders: http.Header{
"Content-Type": []string{"text/plain"},
},
},
}
@ -441,8 +506,9 @@ func TestDeferredResponseWriter_Write(t *testing.T) {
if res.StatusCode != http.StatusOK {
t.Fatalf("status code is not writtend properly, expected: 200, got: %d", res.StatusCode)
}
contentEncoding := res.Header.Get("Content-Encoding")
varyHeader := res.Header.Get("Vary")
if !reflect.DeepEqual(res.Header, tt.expectHeaders) {
t.Fatal(cmp.Diff(tt.expectHeaders, res.Header))
}
resBytes, err := io.ReadAll(res.Body)
if err != nil {
@ -450,14 +516,6 @@ func TestDeferredResponseWriter_Write(t *testing.T) {
}
if tt.expectGzip {
if contentEncoding != "gzip" {
t.Fatalf("content-encoding is not set properly, expected: gzip, got: %s", contentEncoding)
}
if !strings.Contains(varyHeader, "Accept-Encoding") {
t.Errorf("vary header doesn't have Accept-Encoding")
}
gr, err := gzip.NewReader(bytes.NewReader(resBytes))
if err != nil {
t.Fatalf("failed to create gzip reader: %v", err)
@ -471,22 +529,101 @@ func TestDeferredResponseWriter_Write(t *testing.T) {
if !bytes.Equal(fullPayload, decompressed) {
t.Errorf("payload mismatch, expected: %s, got: %s", fullPayload, decompressed)
}
} else {
if contentEncoding != "" {
t.Errorf("content-encoding is set unexpectedly")
}
if strings.Contains(varyHeader, "Accept-Encoding") {
t.Errorf("accept encoding is set unexpectedly")
}
if !bytes.Equal(fullPayload, resBytes) {
t.Errorf("payload mismatch, expected: %s, got: %s", fullPayload, resBytes)
}
}
})
}
}
func benchmarkChunkingGzip(b *testing.B, count int, chunk []byte) {
mockResponseWriter := httptest.NewRecorder()
mockResponseWriter.Body = nil
drw := &deferredResponseWriter{
mediaType: "text/plain",
statusCode: 200,
contentEncoding: "gzip",
hw: mockResponseWriter,
ctx: context.Background(),
}
b.ResetTimer()
for i := 0; i < count; i++ {
n, err := drw.Write(chunk)
if err != nil {
b.Fatalf("unexpected error while writing chunk: %v", err)
}
if n != len(chunk) {
b.Errorf("write is not complete, expected: %d bytes, written: %d bytes", len(chunk), n)
}
}
err := drw.Close()
if err != nil {
b.Fatalf("unexpected error when closing deferredResponseWriter: %v", err)
}
res := mockResponseWriter.Result()
if res.StatusCode != http.StatusOK {
b.Fatalf("status code is not writtend properly, expected: 200, got: %d", res.StatusCode)
}
}
func BenchmarkChunkingGzip(b *testing.B) {
tests := []struct {
count int
size int
}{
{
count: 100,
size: 1_000,
},
{
count: 100,
size: 100_000,
},
{
count: 1_000,
size: 100_000,
},
{
count: 1_000,
size: 1_000_000,
},
{
count: 10_000,
size: 100_000,
},
{
count: 100_000,
size: 10_000,
},
{
count: 1,
size: 100_000,
},
{
count: 1,
size: 1_000_000,
},
{
count: 1,
size: 10_000_000,
},
{
count: 1,
size: 100_000_000,
},
{
count: 1,
size: 1_000_000_000,
},
}
for _, t := range tests {
b.Run(fmt.Sprintf("Count=%d/Size=%d", t.count, t.size), func(b *testing.B) {
chunk := []byte(rand2.String(t.size))
benchmarkChunkingGzip(b, t.count, chunk)
})
}
}