deferredResponseWriter returns after calling the Close() method

previously all sorts of errors including a data race were possible because deferredResponseWriter resets the writer and returns it to the pool.

an attempt to write to a nil writer will lead to "invalid memory address or nil pointer dereference"
sharing the same instance of deferredResponseWriter might lead to "index out of range [43] with length 30" and "recovered from err index > windowEnd" errors

Kubernetes-commit: e6f98311d00f083c1b980ed7434d2e9769fa921f
This commit is contained in:
Lukasz Szaszkiewicz 2020-09-07 12:42:36 +02:00 committed by Kubernetes Publisher
parent 678d99e614
commit ac3c7faf5d
2 changed files with 94 additions and 5 deletions

View File

@ -96,9 +96,11 @@ func SerializeObject(mediaType string, encoder runtime.Encoder, hw http.Response
err := encoder.Encode(object, w)
if err == nil {
err = w.Close()
if err == nil {
return
if err != nil {
// we cannot write an error to the writer anymore as the Encode call was successful.
utilruntime.HandleError(fmt.Errorf("apiserver was unable to close cleanly the response writer: %v", err))
}
return
}
// make a best effort to write the object if a failure is detected

View File

@ -20,6 +20,7 @@ import (
"bytes"
"compress/gzip"
"encoding/hex"
"errors"
"fmt"
"io"
"io/ioutil"
@ -28,7 +29,7 @@ import (
"reflect"
"testing"
"k8s.io/apimachinery/pkg/api/errors"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/diff"
@ -37,6 +38,76 @@ import (
featuregatetesting "k8s.io/component-base/featuregate/testing"
)
func TestSerializeObjectParallel(t *testing.T) {
largePayload := bytes.Repeat([]byte("0123456789abcdef"), defaultGzipThresholdBytes/16+1)
type test struct {
name string
compressionEnabled bool
mediaType string
out []byte
outErrs []error
req *http.Request
statusCode int
object runtime.Object
wantCode int
wantHeaders http.Header
wantBody []byte
}
newTest := func() test {
return test{
name: "compress on gzip",
compressionEnabled: true,
out: largePayload,
mediaType: "application/json",
req: &http.Request{Header: http.Header{
"Accept-Encoding": []string{"gzip"},
}},
wantCode: http.StatusOK,
wantHeaders: http.Header{
"Content-Type": []string{"application/json"},
"Content-Encoding": []string{"gzip"},
"Vary": []string{"Accept-Encoding"},
},
}
}
for i := 0; i < 100; i++ {
ctt := newTest()
t.Run(ctt.name, func(t *testing.T) {
defer func() {
if r := recover(); r != nil {
t.Fatalf("recovered from err %v", r)
}
}()
t.Parallel()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIResponseCompression, ctt.compressionEnabled)()
encoder := &fakeEncoder{
buf: ctt.out,
errs: ctt.outErrs,
}
if ctt.statusCode == 0 {
ctt.statusCode = http.StatusOK
}
recorder := &fakeResponseRecorder{
ResponseRecorder: httptest.NewRecorder(),
fe: encoder,
errorAfterEncoding: true,
}
SerializeObject(ctt.mediaType, encoder, recorder, ctt.req, ctt.statusCode, ctt.object)
result := recorder.Result()
if result.StatusCode != ctt.wantCode {
t.Fatalf("unexpected code: %v", result.StatusCode)
}
if !reflect.DeepEqual(result.Header, ctt.wantHeaders) {
t.Fatal(diff.ObjectReflectDiff(ctt.wantHeaders, result.Header))
}
})
}
}
func TestSerializeObject(t *testing.T) {
smallPayload := []byte("{test-object,test-object}")
largePayload := bytes.Repeat([]byte("0123456789abcdef"), defaultGzipThresholdBytes/16+1)
@ -111,7 +182,7 @@ func TestSerializeObject(t *testing.T) {
{
name: "fail to encode object or status with status code",
out: smallPayload,
outErrs: []error{errors.NewNotFound(schema.GroupResource{}, "test"), fmt.Errorf("bad2")},
outErrs: []error{kerrors.NewNotFound(schema.GroupResource{}, "test"), fmt.Errorf("bad2")},
mediaType: "application/json",
req: &http.Request{Header: http.Header{}},
statusCode: http.StatusOK,
@ -123,7 +194,7 @@ func TestSerializeObject(t *testing.T) {
{
name: "fail to encode object or status with status code and keeps previous error",
out: smallPayload,
outErrs: []error{errors.NewNotFound(schema.GroupResource{}, "test"), fmt.Errorf("bad2")},
outErrs: []error{kerrors.NewNotFound(schema.GroupResource{}, "test"), fmt.Errorf("bad2")},
mediaType: "application/json",
req: &http.Request{Header: http.Header{}},
statusCode: http.StatusNotAcceptable,
@ -270,10 +341,25 @@ func TestSerializeObject(t *testing.T) {
}
}
type fakeResponseRecorder struct {
*httptest.ResponseRecorder
fe *fakeEncoder
errorAfterEncoding bool
}
func (frw *fakeResponseRecorder) Write(buf []byte) (int, error) {
if frw.errorAfterEncoding && frw.fe.encodeCalled {
return 0, errors.New("returning a requested error")
}
return frw.ResponseRecorder.Write(buf)
}
type fakeEncoder struct {
obj runtime.Object
buf []byte
errs []error
encodeCalled bool
}
func (e *fakeEncoder) Encode(obj runtime.Object, w io.Writer) error {
@ -284,6 +370,7 @@ func (e *fakeEncoder) Encode(obj runtime.Object, w io.Writer) error {
return err
}
_, err := w.Write(e.buf)
e.encodeCalled = true
return err
}