Merge pull request #71548 from smarterclayton/watch_converted

Support Table and PartialObjectMetadata on watch

Kubernetes-commit: 6f9bf5fe98bcc3b436fea4d6dd345a1502d20778
This commit is contained in:
Kubernetes Publisher 2019-03-19 22:42:22 -07:00
commit d5ae2df97a
17 changed files with 680 additions and 403 deletions

484
Godeps/Godeps.json generated

File diff suppressed because it is too large Load Diff

View File

@ -37,7 +37,7 @@ import (
"testing"
"time"
"github.com/emicklei/go-restful"
restful "github.com/emicklei/go-restful"
fuzzer "k8s.io/apimachinery/pkg/api/apitesting/fuzzer"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrs "k8s.io/apimachinery/pkg/api/errors"
@ -51,6 +51,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/diff"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@ -64,6 +65,7 @@ import (
"k8s.io/apiserver/pkg/audit"
auditpolicy "k8s.io/apiserver/pkg/audit/policy"
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/request"
genericapitesting "k8s.io/apiserver/pkg/endpoints/testing"
@ -1804,6 +1806,10 @@ func TestGetTable(t *testing.T) {
accept: runtime.ContentTypeJSON + ";as=Table;v=v1;g=meta.k8s.io",
statusCode: http.StatusNotAcceptable,
},
{
accept: runtime.ContentTypeProtobuf + ";as=Table;v=v1beta1;g=meta.k8s.io",
statusCode: http.StatusNotAcceptable,
},
{
item: true,
accept: runtime.ContentTypeJSON + ";as=Table;v=v1beta1;g=meta.k8s.io",
@ -1819,6 +1825,24 @@ func TestGetTable(t *testing.T) {
},
},
},
{
item: true,
accept: strings.Join([]string{
runtime.ContentTypeProtobuf + ";as=Table;v=v1beta1;g=meta.k8s.io",
runtime.ContentTypeJSON + ";as=Table;v=v1beta1;g=meta.k8s.io",
}, ","),
expected: &metav1beta1.Table{
TypeMeta: metav1.TypeMeta{Kind: "Table", APIVersion: "meta.k8s.io/v1beta1"},
ListMeta: metav1.ListMeta{ResourceVersion: "10", SelfLink: "/blah"},
ColumnDefinitions: []metav1beta1.TableColumnDefinition{
{Name: "Name", Type: "string", Format: "name", Description: metaDoc["name"]},
{Name: "Created At", Type: "date", Description: metaDoc["creationTimestamp"]},
},
Rows: []metav1beta1.TableRow{
{Cells: []interface{}{"foo1", now.Time.UTC().Format(time.RFC3339)}, Object: runtime.RawExtension{Raw: encodedBody}},
},
},
},
{
item: true,
accept: runtime.ContentTypeJSON + ";as=Table;v=v1beta1;g=meta.k8s.io",
@ -1918,6 +1942,216 @@ func TestGetTable(t *testing.T) {
}
}
func TestWatchTable(t *testing.T) {
obj := genericapitesting.Simple{
ObjectMeta: metav1.ObjectMeta{Name: "foo1", Namespace: "ns1", ResourceVersion: "10", SelfLink: "/blah", CreationTimestamp: metav1.NewTime(time.Unix(1, 0)), UID: types.UID("abcdef0123")},
Other: "foo",
}
m, err := meta.Accessor(&obj)
if err != nil {
t.Fatal(err)
}
partial := meta.AsPartialObjectMetadata(m)
partial.GetObjectKind().SetGroupVersionKind(metav1beta1.SchemeGroupVersion.WithKind("PartialObjectMetadata"))
encodedBody, err := runtime.Encode(metainternalversion.Codecs.LegacyCodec(metav1beta1.SchemeGroupVersion), partial)
if err != nil {
t.Fatal(err)
}
// the codec includes a trailing newline that is not present during decode
encodedBody = bytes.TrimSpace(encodedBody)
metaDoc := metav1.ObjectMeta{}.SwaggerDoc()
s := metainternalversion.Codecs.SupportedMediaTypes()[0].Serializer
tests := []struct {
accept string
params url.Values
send func(w *watch.FakeWatcher)
expected []*metav1.WatchEvent
contentType string
statusCode int
item bool
}{
{
accept: runtime.ContentTypeJSON + ";as=Table;v=v1;g=meta.k8s.io",
statusCode: http.StatusNotAcceptable,
},
{
accept: runtime.ContentTypeJSON + ";as=Table;v=v1beta1;g=meta.k8s.io",
send: func(w *watch.FakeWatcher) {
w.Add(&obj)
},
expected: []*metav1.WatchEvent{
{
Type: "ADDED",
Object: runtime.RawExtension{
Raw: []byte(strings.TrimSpace(runtime.EncodeOrDie(s, &metav1beta1.Table{
TypeMeta: metav1.TypeMeta{Kind: "Table", APIVersion: "meta.k8s.io/v1beta1"},
ListMeta: metav1.ListMeta{ResourceVersion: "10", SelfLink: "/blah"},
ColumnDefinitions: []metav1beta1.TableColumnDefinition{
{Name: "Name", Type: "string", Format: "name", Description: metaDoc["name"]},
{Name: "Created At", Type: "date", Description: metaDoc["creationTimestamp"]},
},
Rows: []metav1beta1.TableRow{
{Cells: []interface{}{"foo1", time.Unix(1, 0).UTC().Format(time.RFC3339)}, Object: runtime.RawExtension{Raw: encodedBody}},
},
}))),
},
},
},
},
{
accept: runtime.ContentTypeJSON + ";as=Table;v=v1beta1;g=meta.k8s.io",
send: func(w *watch.FakeWatcher) {
w.Add(&obj)
w.Modify(&obj)
},
expected: []*metav1.WatchEvent{
{
Type: "ADDED",
Object: runtime.RawExtension{
Raw: []byte(strings.TrimSpace(runtime.EncodeOrDie(s, &metav1beta1.Table{
TypeMeta: metav1.TypeMeta{Kind: "Table", APIVersion: "meta.k8s.io/v1beta1"},
ListMeta: metav1.ListMeta{ResourceVersion: "10", SelfLink: "/blah"},
ColumnDefinitions: []metav1beta1.TableColumnDefinition{
{Name: "Name", Type: "string", Format: "name", Description: metaDoc["name"]},
{Name: "Created At", Type: "date", Description: metaDoc["creationTimestamp"]},
},
Rows: []metav1beta1.TableRow{
{Cells: []interface{}{"foo1", time.Unix(1, 0).UTC().Format(time.RFC3339)}, Object: runtime.RawExtension{Raw: encodedBody}},
},
}))),
},
},
{
Type: "MODIFIED",
Object: runtime.RawExtension{
Raw: []byte(strings.TrimSpace(runtime.EncodeOrDie(s, &metav1beta1.Table{
TypeMeta: metav1.TypeMeta{Kind: "Table", APIVersion: "meta.k8s.io/v1beta1"},
ListMeta: metav1.ListMeta{ResourceVersion: "10", SelfLink: "/blah"},
Rows: []metav1beta1.TableRow{
{Cells: []interface{}{"foo1", time.Unix(1, 0).UTC().Format(time.RFC3339)}, Object: runtime.RawExtension{Raw: encodedBody}},
},
}))),
},
},
},
},
}
for i, test := range tests {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
storage := map[string]rest.Storage{}
simpleStorage := SimpleRESTStorage{
item: obj,
list: []genericapitesting.Simple{obj},
}
selfLinker := &setTestSelfLinker{
t: t,
expectedSet: "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple",
namespace: "default",
}
if test.item {
selfLinker.expectedSet += "/id"
selfLinker.name = "id"
}
storage["simple"] = &simpleStorage
handler := handleLinker(storage, selfLinker)
server := httptest.NewServer(handler)
defer server.Close()
var id string
if test.item {
id = "/id"
}
u, err := url.Parse(server.URL + "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple")
if err != nil {
t.Fatal(err)
}
if test.params == nil {
test.params = url.Values{}
}
if test.item {
test.params["fieldSelector"] = []string{fmt.Sprintf("metadata.name=%s", id)}
}
test.params["watch"] = []string{"1"}
u.RawQuery = test.params.Encode()
req := &http.Request{Method: "GET", URL: u}
req.Header = http.Header{}
req.Header.Set("Accept", test.accept)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatal(err)
}
if test.statusCode != 0 {
if resp.StatusCode != test.statusCode {
t.Fatalf("%d: unexpected response: %#v", i, resp)
}
obj, _, err := extractBodyObject(resp, unstructured.UnstructuredJSONScheme)
if err != nil {
t.Fatalf("%d: unexpected body read error: %v", i, err)
}
gvk := schema.GroupVersionKind{Version: "v1", Kind: "Status"}
if obj.GetObjectKind().GroupVersionKind() != gvk {
t.Fatalf("%d: unexpected error body: %#v", i, obj)
}
return
}
if resp.StatusCode != http.StatusOK {
t.Fatalf("%d: unexpected response: %#v", i, resp)
}
go func() {
defer simpleStorage.fakeWatch.Stop()
test.send(simpleStorage.fakeWatch)
}()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatal(err)
}
t.Logf("Body:\n%s", string(body))
d := watcher(resp.Header.Get("Content-Type"), ioutil.NopCloser(bytes.NewReader(body)))
var actual []*metav1.WatchEvent
for {
var event metav1.WatchEvent
_, _, err := d.Decode(nil, &event)
if err == io.EOF {
break
}
if err != nil {
t.Fatal(err)
}
actual = append(actual, &event)
}
if !reflect.DeepEqual(test.expected, actual) {
for i := range test.expected {
if i >= len(actual) {
break
}
t.Logf("%s", diff.StringDiff(string(test.expected[i].Object.Raw), string(actual[i].Object.Raw)))
}
t.Fatalf("unexpected: %s", diff.ObjectReflectDiff(test.expected, actual))
}
})
}
}
func watcher(mediaType string, r io.ReadCloser) streaming.Decoder {
info, ok := runtime.SerializerInfoForMediaType(metainternalversion.Codecs.SupportedMediaTypes(), mediaType)
if !ok || info.StreamSerializer == nil {
panic(info)
}
streamSerializer := info.StreamSerializer
fr := streamSerializer.Framer.NewFrameReader(r)
d := streaming.NewDecoder(fr, streamSerializer.Serializer)
return d
}
func TestGetPartialObjectMetadata(t *testing.T) {
now := metav1.Time{metav1.Now().Rfc3339Copy().Local()}
storage := map[string]rest.Storage{}
@ -3618,7 +3852,7 @@ func (obj *UnregisteredAPIObject) DeepCopyObject() runtime.Object {
func TestWriteJSONDecodeError(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
responsewriters.WriteObjectNegotiated(codecs, newGroupVersion, w, req, http.StatusOK, &UnregisteredAPIObject{"Undecodable"})
responsewriters.WriteObjectNegotiated(codecs, negotiation.DefaultEndpointRestrictions, newGroupVersion, w, req, http.StatusOK, &UnregisteredAPIObject{"Undecodable"})
}))
defer server.Close()
// Decode error response behavior is dictated by

View File

@ -69,5 +69,5 @@ func (s *APIGroupHandler) handle(req *restful.Request, resp *restful.Response) {
}
func (s *APIGroupHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
responsewriters.WriteObjectNegotiated(s.serializer, schema.GroupVersion{}, w, req, http.StatusOK, &s.group)
responsewriters.WriteObjectNegotiated(s.serializer, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, w, req, http.StatusOK, &s.group)
}

View File

@ -72,5 +72,5 @@ func (s *legacyRootAPIHandler) handle(req *restful.Request, resp *restful.Respon
Versions: []string{"v1"},
}
responsewriters.WriteObjectNegotiated(s.serializer, schema.GroupVersion{}, resp.ResponseWriter, req.Request, http.StatusOK, apiVersions)
responsewriters.WriteObjectNegotiated(s.serializer, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, resp.ResponseWriter, req.Request, http.StatusOK, apiVersions)
}

View File

@ -111,7 +111,7 @@ func (s *rootAPIsHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request)
groups[i].ServerAddressByClientCIDRs = serverCIDR
}
responsewriters.WriteObjectNegotiated(s.serializer, schema.GroupVersion{}, resp, req, http.StatusOK, &metav1.APIGroupList{Groups: groups})
responsewriters.WriteObjectNegotiated(s.serializer, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, resp, req, http.StatusOK, &metav1.APIGroupList{Groups: groups})
}
func (s *rootAPIsHandler) restfulHandle(req *restful.Request, resp *restful.Response) {

View File

@ -78,6 +78,6 @@ func (s *APIVersionHandler) handle(req *restful.Request, resp *restful.Response)
}
func (s *APIVersionHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
responsewriters.WriteObjectNegotiated(s.serializer, schema.GroupVersion{}, w, req, http.StatusOK,
responsewriters.WriteObjectNegotiated(s.serializer, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, w, req, http.StatusOK,
&metav1.APIResourceList{GroupVersion: s.groupVersion.String(), APIResources: s.apiResourceLister.ListAPIResources()})
}

View File

@ -257,7 +257,7 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch
}
requestInfo, _ := request.RequestInfoFrom(ctx)
metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
serveWatch(watcher, scope, req, w, timeout)
serveWatch(watcher, scope, outputMediaType, req, w, timeout)
})
return
}

View File

@ -47,6 +47,34 @@ func (e errNotAcceptable) Status() metav1.Status {
}
}
// errNotAcceptableConversion indicates Accept negotiation has failed specifically
// for a conversion to a known type.
type errNotAcceptableConversion struct {
target string
accepted []string
}
// NewNotAcceptableConversionError returns an error indicating that the desired
// API transformation to the target group version kind string is not accepted and
// only the listed mime types are allowed. This is temporary while Table does not
// yet support protobuf encoding.
func NewNotAcceptableConversionError(target string, accepted []string) error {
return errNotAcceptableConversion{target, accepted}
}
func (e errNotAcceptableConversion) Error() string {
return fmt.Sprintf("only the following media types are accepted when converting to %s: %v", e.target, strings.Join(e.accepted, ", "))
}
func (e errNotAcceptableConversion) Status() metav1.Status {
return metav1.Status{
Status: metav1.StatusFailure,
Code: http.StatusNotAcceptable,
Reason: metav1.StatusReasonNotAcceptable,
Message: e.Error(),
}
}
// errUnsupportedMediaType indicates Content-Type is not recognized
type errUnsupportedMediaType struct {
accepted []string

View File

@ -56,15 +56,9 @@ func NegotiateOutputMediaType(req *http.Request, ns runtime.NegotiatedSerializer
return mediaType, info, nil
}
// NegotiateOutputSerializer returns a serializer for the output.
func NegotiateOutputSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.SerializerInfo, error) {
_, info, err := NegotiateOutputMediaType(req, ns, DefaultEndpointRestrictions)
return info, err
}
// NegotiateOutputStreamSerializer returns a stream serializer for the given request.
func NegotiateOutputStreamSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.SerializerInfo, error) {
mediaType, ok := NegotiateMediaTypeOptions(req.Header.Get("Accept"), AcceptedMediaTypesForEndpoint(ns), DefaultEndpointRestrictions)
// NegotiateOutputMediaTypeStream returns a stream serializer for the given request.
func NegotiateOutputMediaTypeStream(req *http.Request, ns runtime.NegotiatedSerializer, restrictions EndpointRestrictions) (runtime.SerializerInfo, error) {
mediaType, ok := NegotiateMediaTypeOptions(req.Header.Get("Accept"), AcceptedMediaTypesForEndpoint(ns), restrictions)
if !ok || mediaType.Accepted.Serializer.StreamSerializer == nil {
_, supported := MediaTypesForSerializer(ns)
return runtime.SerializerInfo{}, NewNotAcceptableError(supported)
@ -124,7 +118,7 @@ func isPrettyPrint(req *http.Request) bool {
type EndpointRestrictions interface {
// AllowsConversion should return true if the specified group version kind
// is an allowed target object.
AllowsConversion(schema.GroupVersionKind) bool
AllowsConversion(target schema.GroupVersionKind, mimeType, mimeSubType string) bool
// AllowsServerVersion should return true if the specified version is valid
// for the server group.
AllowsServerVersion(version string) bool
@ -139,9 +133,11 @@ var DefaultEndpointRestrictions = emptyEndpointRestrictions{}
type emptyEndpointRestrictions struct{}
func (emptyEndpointRestrictions) AllowsConversion(schema.GroupVersionKind) bool { return false }
func (emptyEndpointRestrictions) AllowsServerVersion(string) bool { return false }
func (emptyEndpointRestrictions) AllowsStreamSchema(s string) bool { return s == "watch" }
func (emptyEndpointRestrictions) AllowsConversion(schema.GroupVersionKind, string, string) bool {
return false
}
func (emptyEndpointRestrictions) AllowsServerVersion(string) bool { return false }
func (emptyEndpointRestrictions) AllowsStreamSchema(s string) bool { return s == "watch" }
// AcceptedMediaType contains information about a valid media type that the
// server can serialize.
@ -240,7 +236,7 @@ func acceptMediaTypeOptions(params map[string]string, accepts *AcceptedMediaType
}
}
if options.Convert != nil && !endpoint.AllowsConversion(*options.Convert) {
if options.Convert != nil && !endpoint.AllowsConversion(*options.Convert, accepts.Type, accepts.SubType) {
return MediaTypeOptions{}, false
}

View File

@ -231,7 +231,7 @@ func TestNegotiate(t *testing.T) {
req = &http.Request{Header: http.Header{}}
req.Header.Set("Accept", test.accept)
}
s, err := NegotiateOutputSerializer(req, test.ns)
_, s, err := NegotiateOutputMediaType(req, test.ns, DefaultEndpointRestrictions)
switch {
case err == nil && test.errFn != nil:
t.Errorf("%d: failed: expected error", i)

View File

@ -26,86 +26,97 @@ import (
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
"k8s.io/apimachinery/pkg/apis/meta/v1beta1/validation"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
)
// transformObject takes the object as returned by storage and ensures it is in
// the client's desired form, as well as ensuring any API level fields like self-link
// are properly set.
func transformObject(ctx context.Context, obj runtime.Object, opts interface{}, mediaType negotiation.MediaTypeOptions, scope RequestScope, req *http.Request) (runtime.Object, error) {
if _, ok := obj.(*metav1.Status); ok {
return obj, nil
}
if err := setObjectSelfLink(ctx, obj, req, scope.Namer); err != nil {
return nil, err
}
switch target := mediaType.Convert; {
case target == nil:
return obj, nil
case target.Kind == "PartialObjectMetadata" && target.GroupVersion() == metav1beta1.SchemeGroupVersion:
return asV1Beta1PartialObjectMetadata(obj)
case target.Kind == "PartialObjectMetadataList" && target.GroupVersion() == metav1beta1.SchemeGroupVersion:
return asV1Beta1PartialObjectMetadataList(obj)
case target.Kind == "Table" && target.GroupVersion() == metav1beta1.SchemeGroupVersion:
options, ok := opts.(*metav1beta1.TableOptions)
if !ok {
return nil, fmt.Errorf("unexpected TableOptions, got %T", opts)
}
return asV1Beta1Table(ctx, obj, options, scope)
default:
accepted, _ := negotiation.MediaTypesForSerializer(metainternalversion.Codecs)
err := negotiation.NewNotAcceptableError(accepted)
return nil, err
}
}
// optionsForTransform will load and validate any additional query parameter options for
// a conversion or return an error.
func optionsForTransform(mediaType negotiation.MediaTypeOptions, req *http.Request) (interface{}, error) {
switch target := mediaType.Convert; {
case target == nil:
case target.Kind == "Table" && target.GroupVersion() == metav1beta1.SchemeGroupVersion:
opts := &metav1beta1.TableOptions{}
if err := metav1beta1.ParameterCodec.DecodeParameters(req.URL.Query(), metav1beta1.SchemeGroupVersion, opts); err != nil {
return nil, err
}
switch errs := validation.ValidateTableOptions(opts); len(errs) {
case 0:
return opts, nil
case 1:
return nil, errors.NewBadRequest(fmt.Sprintf("Unable to convert to Table as requested: %v", errs[0].Error()))
default:
return nil, errors.NewBadRequest(fmt.Sprintf("Unable to convert to Table as requested: %v", errs))
}
}
return nil, nil
}
// targetEncodingForTransform returns the appropriate serializer for the input media type
func targetEncodingForTransform(scope *RequestScope, mediaType negotiation.MediaTypeOptions, req *http.Request) (schema.GroupVersionKind, runtime.NegotiatedSerializer, bool) {
switch target := mediaType.Convert; {
case target == nil:
case target.Kind == "PartialObjectMetadata" && target.GroupVersion() == metav1beta1.SchemeGroupVersion,
target.Kind == "PartialObjectMetadataList" && target.GroupVersion() == metav1beta1.SchemeGroupVersion,
target.Kind == "Table" && target.GroupVersion() == metav1beta1.SchemeGroupVersion:
return *target, metainternalversion.Codecs, true
}
return scope.Kind, scope.Serializer, false
}
// transformResponseObject takes an object loaded from storage and performs any necessary transformations.
// Will write the complete response object.
func transformResponseObject(ctx context.Context, scope RequestScope, req *http.Request, w http.ResponseWriter, statusCode int, mediaType negotiation.MediaTypeOptions, result runtime.Object) {
// status objects are ignored for transformation
if _, ok := result.(*metav1.Status); ok {
responsewriters.WriteObject(statusCode, scope.Kind.GroupVersion(), scope.Serializer, result, w, req)
return
}
// ensure the self link and empty list array are set
if err := setObjectSelfLink(ctx, result, req, scope.Namer); err != nil {
options, err := optionsForTransform(mediaType, req)
if err != nil {
scope.err(err, w, req)
return
}
trace := scope.Trace
// If conversion was allowed by the scope, perform it before writing the response
switch target := mediaType.Convert; {
case target == nil:
trace.Step("Writing response")
responsewriters.WriteObject(statusCode, scope.Kind.GroupVersion(), scope.Serializer, result, w, req)
case target.Kind == "PartialObjectMetadata" && target.GroupVersion() == metav1beta1.SchemeGroupVersion:
partial, err := asV1Beta1PartialObjectMetadata(result)
if err != nil {
scope.err(err, w, req)
return
}
if err := writeMetaInternalVersion(partial, statusCode, w, req, &scope, target.GroupVersion()); err != nil {
scope.err(err, w, req)
return
}
case target.Kind == "PartialObjectMetadataList" && target.GroupVersion() == metav1beta1.SchemeGroupVersion:
trace.Step("Processing list items")
partial, err := asV1Beta1PartialObjectMetadataList(result)
if err != nil {
scope.err(err, w, req)
return
}
if err := writeMetaInternalVersion(partial, statusCode, w, req, &scope, target.GroupVersion()); err != nil {
scope.err(err, w, req)
return
}
case target.Kind == "Table" && target.GroupVersion() == metav1beta1.SchemeGroupVersion:
opts := &metav1beta1.TableOptions{}
trace.Step("Decoding parameters")
if err := metav1beta1.ParameterCodec.DecodeParameters(req.URL.Query(), metav1beta1.SchemeGroupVersion, opts); err != nil {
scope.err(err, w, req)
return
}
table, err := asV1Beta1Table(ctx, result, opts, scope)
if err != nil {
scope.err(err, w, req)
return
}
if err := writeMetaInternalVersion(table, statusCode, w, req, &scope, target.GroupVersion()); err != nil {
scope.err(err, w, req)
return
}
default:
// this block should only be hit if scope AllowsConversion is incorrect
accepted, _ := negotiation.MediaTypesForSerializer(metainternalversion.Codecs)
err := negotiation.NewNotAcceptableError(accepted)
obj, err := transformObject(ctx, result, options, mediaType, scope, req)
if err != nil {
scope.err(err, w, req)
return
}
kind, serializer, _ := targetEncodingForTransform(&scope, mediaType, req)
responsewriters.WriteObjectNegotiated(serializer, &scope, kind.GroupVersion(), w, req, statusCode, obj)
}
// errNotAcceptable indicates Accept negotiation has failed
@ -131,15 +142,11 @@ func (e errNotAcceptable) Status() metav1.Status {
}
func asV1Beta1Table(ctx context.Context, result runtime.Object, opts *metav1beta1.TableOptions, scope RequestScope) (runtime.Object, error) {
trace := scope.Trace
trace.Step("Converting to table")
table, err := scope.TableConvertor.ConvertToTable(ctx, result, opts)
if err != nil {
return nil, err
}
trace.Step("Processing rows")
for i := range table.Rows {
item := &table.Rows[i]
switch opts.IncludeObject {
@ -161,7 +168,6 @@ func asV1Beta1Table(ctx context.Context, result runtime.Object, opts *metav1beta
case metav1beta1.IncludeNone:
item.Object.Object = nil
default:
// TODO: move this to validation on the table options?
err = errors.NewBadRequest(fmt.Sprintf("unrecognized includeObject value: %q", opts.IncludeObject))
return nil, err
}
@ -172,7 +178,6 @@ func asV1Beta1Table(ctx context.Context, result runtime.Object, opts *metav1beta
func asV1Beta1PartialObjectMetadata(result runtime.Object) (runtime.Object, error) {
if meta.IsListType(result) {
// TODO: this should be calculated earlier
err := newNotAcceptableError(fmt.Sprintf("you requested PartialObjectMetadata, but the requested object is a list (%T)", result))
return nil, err
}
@ -187,7 +192,6 @@ func asV1Beta1PartialObjectMetadata(result runtime.Object) (runtime.Object, erro
func asV1Beta1PartialObjectMetadataList(result runtime.Object) (runtime.Object, error) {
if !meta.IsListType(result) {
// TODO: this should be calculated earlier
return nil, newNotAcceptableError(fmt.Sprintf("you requested PartialObjectMetadataList, but the requested object is not a list (%T)", result))
}
list := &metav1beta1.PartialObjectMetadataList{}
@ -206,14 +210,3 @@ func asV1Beta1PartialObjectMetadataList(result runtime.Object) (runtime.Object,
}
return list, nil
}
func writeMetaInternalVersion(obj runtime.Object, statusCode int, w http.ResponseWriter, req *http.Request, restrictions negotiation.EndpointRestrictions, target schema.GroupVersion) error {
// renegotiate under the internal version
_, info, err := negotiation.NegotiateOutputMediaType(req, metainternalversion.Codecs, restrictions)
if err != nil {
return err
}
encoder := metainternalversion.Codecs.EncoderForVersion(info.Serializer, target)
responsewriters.SerializeObject(info.MediaType, encoder, w, req, statusCode, obj)
return nil
}

View File

@ -55,23 +55,6 @@ func (w httpResponseWriterWithInit) Write(b []byte) (n int, err error) {
return w.innerW.Write(b)
}
// WriteObject renders a returned runtime.Object to the response as a stream or an encoded object. If the object
// returned by the response implements rest.ResourceStreamer that interface will be used to render the
// response. The Accept header and current API version will be passed in, and the output will be copied
// directly to the response body. If content type is returned it is used, otherwise the content type will
// be "application/octet-stream". All other objects are sent to standard JSON serialization.
func WriteObject(statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSerializer, object runtime.Object, w http.ResponseWriter, req *http.Request) {
stream, ok := object.(rest.ResourceStreamer)
if ok {
requestInfo, _ := request.RequestInfoFrom(req.Context())
metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
StreamObject(statusCode, gv, s, stream, w, req)
})
return
}
WriteObjectNegotiated(s, gv, w, req, statusCode, object)
}
// StreamObject performs input stream negotiation from a ResourceStreamer and writes that to the response.
// If the client requests a websocket upgrade, negotiate for a websocket reader protocol (because many
// browser clients cannot easily handle binary streaming protocols).
@ -123,9 +106,17 @@ func SerializeObject(mediaType string, encoder runtime.Encoder, innerW http.Resp
}
// WriteObjectNegotiated renders an object in the content type negotiated by the client.
// The context is optional and can be nil.
func WriteObjectNegotiated(s runtime.NegotiatedSerializer, gv schema.GroupVersion, w http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) {
serializer, err := negotiation.NegotiateOutputSerializer(req, s)
func WriteObjectNegotiated(s runtime.NegotiatedSerializer, restrictions negotiation.EndpointRestrictions, gv schema.GroupVersion, w http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) {
stream, ok := object.(rest.ResourceStreamer)
if ok {
requestInfo, _ := request.RequestInfoFrom(req.Context())
metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
StreamObject(statusCode, gv, s, stream, w, req)
})
return
}
_, serializer, err := negotiation.NegotiateOutputMediaType(req, s, restrictions)
if err != nil {
// if original statusCode was not successful we need to return the original error
// we cannot hide it behind negotiation problems
@ -162,7 +153,7 @@ func ErrorNegotiated(err error, s runtime.NegotiatedSerializer, gv schema.GroupV
return code
}
WriteObjectNegotiated(s, gv, w, req, code, status)
WriteObjectNegotiated(s, negotiation.DefaultEndpointRestrictions, gv, w, req, code, status)
return code
}

View File

@ -79,12 +79,14 @@ func (scope *RequestScope) err(err error, w http.ResponseWriter, req *http.Reque
responsewriters.ErrorNegotiated(err, scope.Serializer, scope.Kind.GroupVersion(), w, req)
}
func (scope *RequestScope) AllowsConversion(gvk schema.GroupVersionKind) bool {
func (scope *RequestScope) AllowsConversion(gvk schema.GroupVersionKind, mimeType, mimeSubType string) bool {
// TODO: this is temporary, replace with an abstraction calculated at endpoint installation time
if gvk.GroupVersion() == metav1beta1.SchemeGroupVersion {
switch gvk.Kind {
case "Table":
return scope.TableConvertor != nil
return scope.TableConvertor != nil &&
mimeType == "application" &&
(mimeSubType == "json" || mimeSubType == "yaml")
case "PartialObjectMetadata", "PartialObjectMetadataList":
// TODO: should delineate between lists and non-list endpoints
return true
@ -172,7 +174,7 @@ type responder struct {
}
func (r *responder) Object(statusCode int, obj runtime.Object) {
responsewriters.WriteObject(statusCode, r.scope.Kind.GroupVersion(), r.scope.Serializer, obj, r.w, r.req)
responsewriters.WriteObjectNegotiated(r.scope.Serializer, &r.scope, r.scope.Kind.GroupVersion(), r.w, r.req, statusCode, obj)
}
func (r *responder) Error(err error) {

View File

@ -25,13 +25,13 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/server/httplog"
"k8s.io/apiserver/pkg/util/wsstream"
@ -61,42 +61,56 @@ func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) {
return t.C, t.Stop
}
// serveWatch handles serving requests to the server
// serveWatch will serve a watch response.
// TODO: the functionality in this method and in WatchServer.Serve is not cleanly decoupled.
func serveWatch(watcher watch.Interface, scope RequestScope, req *http.Request, w http.ResponseWriter, timeout time.Duration) {
// negotiate for the stream serializer
serializer, err := negotiation.NegotiateOutputStreamSerializer(req, scope.Serializer)
func serveWatch(watcher watch.Interface, scope RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration) {
options, err := optionsForTransform(mediaTypeOptions, req)
if err != nil {
scope.err(err, w, req)
return
}
// negotiate for the stream serializer from the scope's serializer
serializer, err := negotiation.NegotiateOutputMediaTypeStream(req, scope.Serializer, &scope)
if err != nil {
scope.err(err, w, req)
return
}
framer := serializer.StreamSerializer.Framer
streamSerializer := serializer.StreamSerializer.Serializer
embedded := serializer.Serializer
encoder := scope.Serializer.EncoderForVersion(streamSerializer, scope.Kind.GroupVersion())
useTextFraming := serializer.EncodesAsText
if framer == nil {
scope.err(fmt.Errorf("no framer defined for %q available for embedded encoding", serializer.MediaType), w, req)
return
}
encoder := scope.Serializer.EncoderForVersion(streamSerializer, scope.Kind.GroupVersion())
useTextFraming := serializer.EncodesAsText
// find the embedded serializer matching the media type
embeddedEncoder := scope.Serializer.EncoderForVersion(embedded, scope.Kind.GroupVersion())
// TODO: next step, get back mediaTypeOptions from negotiate and return the exact value here
mediaType := serializer.MediaType
if mediaType != runtime.ContentTypeJSON {
mediaType += ";stream=watch"
}
ctx := req.Context()
requestInfo, ok := request.RequestInfoFrom(ctx)
if !ok {
scope.err(fmt.Errorf("missing requestInfo"), w, req)
return
// locate the appropriate embedded encoder based on the transform
var embeddedEncoder runtime.Encoder
contentKind, contentSerializer, transform := targetEncodingForTransform(&scope, mediaTypeOptions, req)
if transform {
var embedded runtime.Serializer
for _, supported := range contentSerializer.SupportedMediaTypes() {
if supported.MediaType == serializer.MediaType {
embedded = supported.Serializer
}
}
if embedded == nil {
scope.err(fmt.Errorf("no encoder for %q exists in the requested target %#v", serializer.MediaType, contentSerializer), w, req)
return
}
embeddedEncoder = contentSerializer.EncoderForVersion(embedded, contentKind.GroupVersion())
} else {
embeddedEncoder = scope.Serializer.EncoderForVersion(serializer.Serializer, contentKind.GroupVersion())
}
ctx := req.Context()
server := &WatchServer{
Watching: watcher,
Scope: scope,
@ -106,10 +120,20 @@ func serveWatch(watcher watch.Interface, scope RequestScope, req *http.Request,
Framer: framer,
Encoder: encoder,
EmbeddedEncoder: embeddedEncoder,
Fixup: func(obj runtime.Object) {
if err := setSelfLink(obj, requestInfo, scope.Namer); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to set link for object %v: %v", reflect.TypeOf(obj), err))
Fixup: func(obj runtime.Object) runtime.Object {
result, err := transformObject(ctx, obj, options, mediaTypeOptions, scope, req)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to transform object %v: %v", reflect.TypeOf(obj), err))
return obj
}
// When we are transformed to a table, use the table options as the state for whether we
// should print headers - on watch, we only want to print table headers on the first object
// and omit them on subsequent events.
if tableOptions, ok := options.(*metav1beta1.TableOptions); ok {
tableOptions.NoHeaders = true
}
return result
},
TimeoutFactory: &realTimeoutFactory{timeout},
@ -133,7 +157,8 @@ type WatchServer struct {
Encoder runtime.Encoder
// used to encode the nested object in the watch stream
EmbeddedEncoder runtime.Encoder
Fixup func(runtime.Object)
// used to correct the object before we send it to the serializer
Fixup func(runtime.Object) runtime.Object
TimeoutFactory TimeoutFactory
}
@ -205,8 +230,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return
}
obj := event.Object
s.Fixup(obj)
obj := s.Fixup(event.Object)
if err := s.EmbeddedEncoder.Encode(obj, buf); err != nil {
// unexpected error
utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", obj, err))
@ -272,8 +296,7 @@ func (s *WatchServer) HandleWS(ws *websocket.Conn) {
// End of results.
return
}
obj := event.Object
s.Fixup(obj)
obj := s.Fixup(event.Object)
if err := s.EmbeddedEncoder.Encode(obj, buf); err != nil {
// unexpected error
utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", obj, err))

View File

@ -586,7 +586,7 @@ func TestWatchHTTPErrors(t *testing.T) {
Encoder: newCodec,
EmbeddedEncoder: newCodec,
Fixup: func(obj runtime.Object) {},
Fixup: func(obj runtime.Object) runtime.Object { return obj },
TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done},
}
@ -646,7 +646,7 @@ func TestWatchHTTPDynamicClientErrors(t *testing.T) {
Encoder: newCodec,
EmbeddedEncoder: newCodec,
Fixup: func(obj runtime.Object) {},
Fixup: func(obj runtime.Object) runtime.Object { return obj },
TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done},
}
@ -708,7 +708,7 @@ func TestWatchHTTPTimeout(t *testing.T) {
Encoder: newCodec,
EmbeddedEncoder: newCodec,
Fixup: func(obj runtime.Object) {},
Fixup: func(obj runtime.Object) runtime.Object { return obj },
TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done},
}

View File

@ -18,6 +18,7 @@ package resttest
import (
"context"
"encoding/json"
"fmt"
"reflect"
"strings"
@ -1460,7 +1461,7 @@ func (t *Tester) testListTableConversion(obj runtime.Object, assignFn AssignFunc
}
columns := table.ColumnDefinitions
if len(columns) == 0 {
t.Errorf("unexpected number of columns: %v", len(columns))
t.Fatalf("unexpected number of columns: %v\n%#v", len(columns), columns)
}
if !strings.EqualFold(columns[0].Name, "Name") || columns[0].Type != "string" || columns[0].Format != "name" {
t.Errorf("expect column 0 to be the name column: %#v", columns[0])
@ -1505,8 +1506,11 @@ func (t *Tester) testListTableConversion(obj runtime.Object, assignFn AssignFunc
}
}
if len(row.Cells) != len(table.ColumnDefinitions) {
t.Fatalf("unmatched row length on row %d: %#v", i, row.Cells)
}
}
data, _ := json.MarshalIndent(table, "", " ")
t.Logf("%s", string(data))
}
// =============================================================================

View File

@ -73,9 +73,11 @@ func (c defaultTableConvertor) ConvertToTable(ctx context.Context, object runtim
table.SelfLink = m.GetSelfLink()
}
}
table.ColumnDefinitions = []metav1beta1.TableColumnDefinition{
{Name: "Name", Type: "string", Format: "name", Description: swaggerMetadataDescriptions["name"]},
{Name: "Created At", Type: "date", Description: swaggerMetadataDescriptions["creationTimestamp"]},
if opt, ok := tableOptions.(*metav1beta1.TableOptions); !ok || !opt.NoHeaders {
table.ColumnDefinitions = []metav1beta1.TableColumnDefinition{
{Name: "Name", Type: "string", Format: "name", Description: swaggerMetadataDescriptions["name"]},
{Name: "Created At", Type: "date", Description: swaggerMetadataDescriptions["creationTimestamp"]},
}
}
return &table, nil
}