Support Table and PartialObjectMetadata on watch

Clean up the code paths that lead to objects being transformed and output with negotiation.
Remove some duplicate code that was not consistent. Now, watch will respond correctly to
Table and PartialObjectMetadata requests. Add unit and integration tests.

When transforming responses to Tables, only the first watch event for a given type will
include the columns. Columns will not change unless the watch is restarted.

Add a volume attachment printer and tighten up table validation error cases.

Disable protobuf from table conversion because Tables don't have protobuf because they
use `interface{}`

Kubernetes-commit: 3230a0b4fd14a6166f8362d4732e199e8779c426
This commit is contained in:
Clayton Coleman 2018-11-28 23:50:12 -05:00 committed by Kubernetes Publisher
parent 7845c0df69
commit 8d7b330c4c
17 changed files with 821 additions and 476 deletions

698
Godeps/Godeps.json generated

File diff suppressed because it is too large Load Diff

View File

@ -37,7 +37,7 @@ import (
"testing"
"time"
"github.com/emicklei/go-restful"
restful "github.com/emicklei/go-restful"
fuzzer "k8s.io/apimachinery/pkg/api/apitesting/fuzzer"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrs "k8s.io/apimachinery/pkg/api/errors"
@ -51,6 +51,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/diff"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@ -64,6 +65,7 @@ import (
"k8s.io/apiserver/pkg/audit"
auditpolicy "k8s.io/apiserver/pkg/audit/policy"
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/request"
genericapitesting "k8s.io/apiserver/pkg/endpoints/testing"
@ -1804,6 +1806,10 @@ func TestGetTable(t *testing.T) {
accept: runtime.ContentTypeJSON + ";as=Table;v=v1;g=meta.k8s.io",
statusCode: http.StatusNotAcceptable,
},
{
accept: runtime.ContentTypeProtobuf + ";as=Table;v=v1beta1;g=meta.k8s.io",
statusCode: http.StatusNotAcceptable,
},
{
item: true,
accept: runtime.ContentTypeJSON + ";as=Table;v=v1beta1;g=meta.k8s.io",
@ -1819,6 +1825,24 @@ func TestGetTable(t *testing.T) {
},
},
},
{
item: true,
accept: strings.Join([]string{
runtime.ContentTypeProtobuf + ";as=Table;v=v1beta1;g=meta.k8s.io",
runtime.ContentTypeJSON + ";as=Table;v=v1beta1;g=meta.k8s.io",
}, ","),
expected: &metav1beta1.Table{
TypeMeta: metav1.TypeMeta{Kind: "Table", APIVersion: "meta.k8s.io/v1beta1"},
ListMeta: metav1.ListMeta{ResourceVersion: "10", SelfLink: "/blah"},
ColumnDefinitions: []metav1beta1.TableColumnDefinition{
{Name: "Name", Type: "string", Format: "name", Description: metaDoc["name"]},
{Name: "Created At", Type: "date", Description: metaDoc["creationTimestamp"]},
},
Rows: []metav1beta1.TableRow{
{Cells: []interface{}{"foo1", now.Time.UTC().Format(time.RFC3339)}, Object: runtime.RawExtension{Raw: encodedBody}},
},
},
},
{
item: true,
accept: runtime.ContentTypeJSON + ";as=Table;v=v1beta1;g=meta.k8s.io",
@ -1918,6 +1942,216 @@ func TestGetTable(t *testing.T) {
}
}
func TestWatchTable(t *testing.T) {
obj := genericapitesting.Simple{
ObjectMeta: metav1.ObjectMeta{Name: "foo1", Namespace: "ns1", ResourceVersion: "10", SelfLink: "/blah", CreationTimestamp: metav1.NewTime(time.Unix(1, 0)), UID: types.UID("abcdef0123")},
Other: "foo",
}
m, err := meta.Accessor(&obj)
if err != nil {
t.Fatal(err)
}
partial := meta.AsPartialObjectMetadata(m)
partial.GetObjectKind().SetGroupVersionKind(metav1beta1.SchemeGroupVersion.WithKind("PartialObjectMetadata"))
encodedBody, err := runtime.Encode(metainternalversion.Codecs.LegacyCodec(metav1beta1.SchemeGroupVersion), partial)
if err != nil {
t.Fatal(err)
}
// the codec includes a trailing newline that is not present during decode
encodedBody = bytes.TrimSpace(encodedBody)
metaDoc := metav1.ObjectMeta{}.SwaggerDoc()
s := metainternalversion.Codecs.SupportedMediaTypes()[0].Serializer
tests := []struct {
accept string
params url.Values
send func(w *watch.FakeWatcher)
expected []*metav1.WatchEvent
contentType string
statusCode int
item bool
}{
{
accept: runtime.ContentTypeJSON + ";as=Table;v=v1;g=meta.k8s.io",
statusCode: http.StatusNotAcceptable,
},
{
accept: runtime.ContentTypeJSON + ";as=Table;v=v1beta1;g=meta.k8s.io",
send: func(w *watch.FakeWatcher) {
w.Add(&obj)
},
expected: []*metav1.WatchEvent{
{
Type: "ADDED",
Object: runtime.RawExtension{
Raw: []byte(strings.TrimSpace(runtime.EncodeOrDie(s, &metav1beta1.Table{
TypeMeta: metav1.TypeMeta{Kind: "Table", APIVersion: "meta.k8s.io/v1beta1"},
ListMeta: metav1.ListMeta{ResourceVersion: "10", SelfLink: "/blah"},
ColumnDefinitions: []metav1beta1.TableColumnDefinition{
{Name: "Name", Type: "string", Format: "name", Description: metaDoc["name"]},
{Name: "Created At", Type: "date", Description: metaDoc["creationTimestamp"]},
},
Rows: []metav1beta1.TableRow{
{Cells: []interface{}{"foo1", time.Unix(1, 0).UTC().Format(time.RFC3339)}, Object: runtime.RawExtension{Raw: encodedBody}},
},
}))),
},
},
},
},
{
accept: runtime.ContentTypeJSON + ";as=Table;v=v1beta1;g=meta.k8s.io",
send: func(w *watch.FakeWatcher) {
w.Add(&obj)
w.Modify(&obj)
},
expected: []*metav1.WatchEvent{
{
Type: "ADDED",
Object: runtime.RawExtension{
Raw: []byte(strings.TrimSpace(runtime.EncodeOrDie(s, &metav1beta1.Table{
TypeMeta: metav1.TypeMeta{Kind: "Table", APIVersion: "meta.k8s.io/v1beta1"},
ListMeta: metav1.ListMeta{ResourceVersion: "10", SelfLink: "/blah"},
ColumnDefinitions: []metav1beta1.TableColumnDefinition{
{Name: "Name", Type: "string", Format: "name", Description: metaDoc["name"]},
{Name: "Created At", Type: "date", Description: metaDoc["creationTimestamp"]},
},
Rows: []metav1beta1.TableRow{
{Cells: []interface{}{"foo1", time.Unix(1, 0).UTC().Format(time.RFC3339)}, Object: runtime.RawExtension{Raw: encodedBody}},
},
}))),
},
},
{
Type: "MODIFIED",
Object: runtime.RawExtension{
Raw: []byte(strings.TrimSpace(runtime.EncodeOrDie(s, &metav1beta1.Table{
TypeMeta: metav1.TypeMeta{Kind: "Table", APIVersion: "meta.k8s.io/v1beta1"},
ListMeta: metav1.ListMeta{ResourceVersion: "10", SelfLink: "/blah"},
Rows: []metav1beta1.TableRow{
{Cells: []interface{}{"foo1", time.Unix(1, 0).UTC().Format(time.RFC3339)}, Object: runtime.RawExtension{Raw: encodedBody}},
},
}))),
},
},
},
},
}
for i, test := range tests {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
storage := map[string]rest.Storage{}
simpleStorage := SimpleRESTStorage{
item: obj,
list: []genericapitesting.Simple{obj},
}
selfLinker := &setTestSelfLinker{
t: t,
expectedSet: "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple",
namespace: "default",
}
if test.item {
selfLinker.expectedSet += "/id"
selfLinker.name = "id"
}
storage["simple"] = &simpleStorage
handler := handleLinker(storage, selfLinker)
server := httptest.NewServer(handler)
defer server.Close()
var id string
if test.item {
id = "/id"
}
u, err := url.Parse(server.URL + "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple")
if err != nil {
t.Fatal(err)
}
if test.params == nil {
test.params = url.Values{}
}
if test.item {
test.params["fieldSelector"] = []string{fmt.Sprintf("metadata.name=%s", id)}
}
test.params["watch"] = []string{"1"}
u.RawQuery = test.params.Encode()
req := &http.Request{Method: "GET", URL: u}
req.Header = http.Header{}
req.Header.Set("Accept", test.accept)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatal(err)
}
if test.statusCode != 0 {
if resp.StatusCode != test.statusCode {
t.Fatalf("%d: unexpected response: %#v", i, resp)
}
obj, _, err := extractBodyObject(resp, unstructured.UnstructuredJSONScheme)
if err != nil {
t.Fatalf("%d: unexpected body read error: %v", i, err)
}
gvk := schema.GroupVersionKind{Version: "v1", Kind: "Status"}
if obj.GetObjectKind().GroupVersionKind() != gvk {
t.Fatalf("%d: unexpected error body: %#v", i, obj)
}
return
}
if resp.StatusCode != http.StatusOK {
t.Fatalf("%d: unexpected response: %#v", i, resp)
}
go func() {
defer simpleStorage.fakeWatch.Stop()
test.send(simpleStorage.fakeWatch)
}()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatal(err)
}
t.Logf("Body:\n%s", string(body))
d := watcher(resp.Header.Get("Content-Type"), ioutil.NopCloser(bytes.NewReader(body)))
var actual []*metav1.WatchEvent
for {
var event metav1.WatchEvent
_, _, err := d.Decode(nil, &event)
if err == io.EOF {
break
}
if err != nil {
t.Fatal(err)
}
actual = append(actual, &event)
}
if !reflect.DeepEqual(test.expected, actual) {
for i := range test.expected {
if i >= len(actual) {
break
}
t.Logf("%s", diff.StringDiff(string(test.expected[i].Object.Raw), string(actual[i].Object.Raw)))
}
t.Fatalf("unexpected: %s", diff.ObjectReflectDiff(test.expected, actual))
}
})
}
}
func watcher(mediaType string, r io.ReadCloser) streaming.Decoder {
info, ok := runtime.SerializerInfoForMediaType(metainternalversion.Codecs.SupportedMediaTypes(), mediaType)
if !ok || info.StreamSerializer == nil {
panic(info)
}
streamSerializer := info.StreamSerializer
fr := streamSerializer.Framer.NewFrameReader(r)
d := streaming.NewDecoder(fr, streamSerializer.Serializer)
return d
}
func TestGetPartialObjectMetadata(t *testing.T) {
now := metav1.Time{metav1.Now().Rfc3339Copy().Local()}
storage := map[string]rest.Storage{}
@ -3618,7 +3852,7 @@ func (obj *UnregisteredAPIObject) DeepCopyObject() runtime.Object {
func TestWriteJSONDecodeError(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
responsewriters.WriteObjectNegotiated(codecs, newGroupVersion, w, req, http.StatusOK, &UnregisteredAPIObject{"Undecodable"})
responsewriters.WriteObjectNegotiated(codecs, negotiation.DefaultEndpointRestrictions, newGroupVersion, w, req, http.StatusOK, &UnregisteredAPIObject{"Undecodable"})
}))
defer server.Close()
// Decode error response behavior is dictated by

View File

@ -69,5 +69,5 @@ func (s *APIGroupHandler) handle(req *restful.Request, resp *restful.Response) {
}
func (s *APIGroupHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
responsewriters.WriteObjectNegotiated(s.serializer, schema.GroupVersion{}, w, req, http.StatusOK, &s.group)
responsewriters.WriteObjectNegotiated(s.serializer, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, w, req, http.StatusOK, &s.group)
}

View File

@ -72,5 +72,5 @@ func (s *legacyRootAPIHandler) handle(req *restful.Request, resp *restful.Respon
Versions: []string{"v1"},
}
responsewriters.WriteObjectNegotiated(s.serializer, schema.GroupVersion{}, resp.ResponseWriter, req.Request, http.StatusOK, apiVersions)
responsewriters.WriteObjectNegotiated(s.serializer, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, resp.ResponseWriter, req.Request, http.StatusOK, apiVersions)
}

View File

@ -111,7 +111,7 @@ func (s *rootAPIsHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request)
groups[i].ServerAddressByClientCIDRs = serverCIDR
}
responsewriters.WriteObjectNegotiated(s.serializer, schema.GroupVersion{}, resp, req, http.StatusOK, &metav1.APIGroupList{Groups: groups})
responsewriters.WriteObjectNegotiated(s.serializer, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, resp, req, http.StatusOK, &metav1.APIGroupList{Groups: groups})
}
func (s *rootAPIsHandler) restfulHandle(req *restful.Request, resp *restful.Response) {

View File

@ -78,6 +78,6 @@ func (s *APIVersionHandler) handle(req *restful.Request, resp *restful.Response)
}
func (s *APIVersionHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
responsewriters.WriteObjectNegotiated(s.serializer, schema.GroupVersion{}, w, req, http.StatusOK,
responsewriters.WriteObjectNegotiated(s.serializer, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, w, req, http.StatusOK,
&metav1.APIResourceList{GroupVersion: s.groupVersion.String(), APIResources: s.apiResourceLister.ListAPIResources()})
}

View File

@ -257,7 +257,7 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch
}
requestInfo, _ := request.RequestInfoFrom(ctx)
metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
serveWatch(watcher, scope, req, w, timeout)
serveWatch(watcher, scope, outputMediaType, req, w, timeout)
})
return
}

View File

@ -47,6 +47,34 @@ func (e errNotAcceptable) Status() metav1.Status {
}
}
// errNotAcceptableConversion indicates Accept negotiation has failed specifically
// for a conversion to a known type.
type errNotAcceptableConversion struct {
target string
accepted []string
}
// NewNotAcceptableConversionError returns an error indicating that the desired
// API transformation to the target group version kind string is not accepted and
// only the listed mime types are allowed. This is temporary while Table does not
// yet support protobuf encoding.
func NewNotAcceptableConversionError(target string, accepted []string) error {
return errNotAcceptableConversion{target, accepted}
}
func (e errNotAcceptableConversion) Error() string {
return fmt.Sprintf("only the following media types are accepted when converting to %s: %v", e.target, strings.Join(e.accepted, ", "))
}
func (e errNotAcceptableConversion) Status() metav1.Status {
return metav1.Status{
Status: metav1.StatusFailure,
Code: http.StatusNotAcceptable,
Reason: metav1.StatusReasonNotAcceptable,
Message: e.Error(),
}
}
// errUnsupportedMediaType indicates Content-Type is not recognized
type errUnsupportedMediaType struct {
accepted []string

View File

@ -56,15 +56,9 @@ func NegotiateOutputMediaType(req *http.Request, ns runtime.NegotiatedSerializer
return mediaType, info, nil
}
// NegotiateOutputSerializer returns a serializer for the output.
func NegotiateOutputSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.SerializerInfo, error) {
_, info, err := NegotiateOutputMediaType(req, ns, DefaultEndpointRestrictions)
return info, err
}
// NegotiateOutputStreamSerializer returns a stream serializer for the given request.
func NegotiateOutputStreamSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.SerializerInfo, error) {
mediaType, ok := NegotiateMediaTypeOptions(req.Header.Get("Accept"), AcceptedMediaTypesForEndpoint(ns), DefaultEndpointRestrictions)
// NegotiateOutputMediaTypeStream returns a stream serializer for the given request.
func NegotiateOutputMediaTypeStream(req *http.Request, ns runtime.NegotiatedSerializer, restrictions EndpointRestrictions) (runtime.SerializerInfo, error) {
mediaType, ok := NegotiateMediaTypeOptions(req.Header.Get("Accept"), AcceptedMediaTypesForEndpoint(ns), restrictions)
if !ok || mediaType.Accepted.Serializer.StreamSerializer == nil {
_, supported := MediaTypesForSerializer(ns)
return runtime.SerializerInfo{}, NewNotAcceptableError(supported)
@ -124,7 +118,7 @@ func isPrettyPrint(req *http.Request) bool {
type EndpointRestrictions interface {
// AllowsConversion should return true if the specified group version kind
// is an allowed target object.
AllowsConversion(schema.GroupVersionKind) bool
AllowsConversion(target schema.GroupVersionKind, mimeType, mimeSubType string) bool
// AllowsServerVersion should return true if the specified version is valid
// for the server group.
AllowsServerVersion(version string) bool
@ -139,9 +133,11 @@ var DefaultEndpointRestrictions = emptyEndpointRestrictions{}
type emptyEndpointRestrictions struct{}
func (emptyEndpointRestrictions) AllowsConversion(schema.GroupVersionKind) bool { return false }
func (emptyEndpointRestrictions) AllowsServerVersion(string) bool { return false }
func (emptyEndpointRestrictions) AllowsStreamSchema(s string) bool { return s == "watch" }
func (emptyEndpointRestrictions) AllowsConversion(schema.GroupVersionKind, string, string) bool {
return false
}
func (emptyEndpointRestrictions) AllowsServerVersion(string) bool { return false }
func (emptyEndpointRestrictions) AllowsStreamSchema(s string) bool { return s == "watch" }
// AcceptedMediaType contains information about a valid media type that the
// server can serialize.
@ -240,7 +236,7 @@ func acceptMediaTypeOptions(params map[string]string, accepts *AcceptedMediaType
}
}
if options.Convert != nil && !endpoint.AllowsConversion(*options.Convert) {
if options.Convert != nil && !endpoint.AllowsConversion(*options.Convert, accepts.Type, accepts.SubType) {
return MediaTypeOptions{}, false
}

View File

@ -231,7 +231,7 @@ func TestNegotiate(t *testing.T) {
req = &http.Request{Header: http.Header{}}
req.Header.Set("Accept", test.accept)
}
s, err := NegotiateOutputSerializer(req, test.ns)
_, s, err := NegotiateOutputMediaType(req, test.ns, DefaultEndpointRestrictions)
switch {
case err == nil && test.errFn != nil:
t.Errorf("%d: failed: expected error", i)

View File

@ -26,86 +26,97 @@ import (
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
"k8s.io/apimachinery/pkg/apis/meta/v1beta1/validation"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
)
// transformObject takes the object as returned by storage and ensures it is in
// the client's desired form, as well as ensuring any API level fields like self-link
// are properly set.
func transformObject(ctx context.Context, obj runtime.Object, opts interface{}, mediaType negotiation.MediaTypeOptions, scope RequestScope, req *http.Request) (runtime.Object, error) {
if _, ok := obj.(*metav1.Status); ok {
return obj, nil
}
if err := setObjectSelfLink(ctx, obj, req, scope.Namer); err != nil {
return nil, err
}
switch target := mediaType.Convert; {
case target == nil:
return obj, nil
case target.Kind == "PartialObjectMetadata" && target.GroupVersion() == metav1beta1.SchemeGroupVersion:
return asV1Beta1PartialObjectMetadata(obj)
case target.Kind == "PartialObjectMetadataList" && target.GroupVersion() == metav1beta1.SchemeGroupVersion:
return asV1Beta1PartialObjectMetadataList(obj)
case target.Kind == "Table" && target.GroupVersion() == metav1beta1.SchemeGroupVersion:
options, ok := opts.(*metav1beta1.TableOptions)
if !ok {
return nil, fmt.Errorf("unexpected TableOptions, got %T", opts)
}
return asV1Beta1Table(ctx, obj, options, scope)
default:
accepted, _ := negotiation.MediaTypesForSerializer(metainternalversion.Codecs)
err := negotiation.NewNotAcceptableError(accepted)
return nil, err
}
}
// optionsForTransform will load and validate any additional query parameter options for
// a conversion or return an error.
func optionsForTransform(mediaType negotiation.MediaTypeOptions, req *http.Request) (interface{}, error) {
switch target := mediaType.Convert; {
case target == nil:
case target.Kind == "Table" && target.GroupVersion() == metav1beta1.SchemeGroupVersion:
opts := &metav1beta1.TableOptions{}
if err := metav1beta1.ParameterCodec.DecodeParameters(req.URL.Query(), metav1beta1.SchemeGroupVersion, opts); err != nil {
return nil, err
}
switch errs := validation.ValidateTableOptions(opts); len(errs) {
case 0:
return opts, nil
case 1:
return nil, errors.NewBadRequest(fmt.Sprintf("Unable to convert to Table as requested: %v", errs[0].Error()))
default:
return nil, errors.NewBadRequest(fmt.Sprintf("Unable to convert to Table as requested: %v", errs))
}
}
return nil, nil
}
// targetEncodingForTransform returns the appropriate serializer for the input media type
func targetEncodingForTransform(scope *RequestScope, mediaType negotiation.MediaTypeOptions, req *http.Request) (schema.GroupVersionKind, runtime.NegotiatedSerializer, bool) {
switch target := mediaType.Convert; {
case target == nil:
case target.Kind == "PartialObjectMetadata" && target.GroupVersion() == metav1beta1.SchemeGroupVersion,
target.Kind == "PartialObjectMetadataList" && target.GroupVersion() == metav1beta1.SchemeGroupVersion,
target.Kind == "Table" && target.GroupVersion() == metav1beta1.SchemeGroupVersion:
return *target, metainternalversion.Codecs, true
}
return scope.Kind, scope.Serializer, false
}
// transformResponseObject takes an object loaded from storage and performs any necessary transformations.
// Will write the complete response object.
func transformResponseObject(ctx context.Context, scope RequestScope, req *http.Request, w http.ResponseWriter, statusCode int, mediaType negotiation.MediaTypeOptions, result runtime.Object) {
// status objects are ignored for transformation
if _, ok := result.(*metav1.Status); ok {
responsewriters.WriteObject(statusCode, scope.Kind.GroupVersion(), scope.Serializer, result, w, req)
return
}
// ensure the self link and empty list array are set
if err := setObjectSelfLink(ctx, result, req, scope.Namer); err != nil {
options, err := optionsForTransform(mediaType, req)
if err != nil {
scope.err(err, w, req)
return
}
trace := scope.Trace
// If conversion was allowed by the scope, perform it before writing the response
switch target := mediaType.Convert; {
case target == nil:
trace.Step("Writing response")
responsewriters.WriteObject(statusCode, scope.Kind.GroupVersion(), scope.Serializer, result, w, req)
case target.Kind == "PartialObjectMetadata" && target.GroupVersion() == metav1beta1.SchemeGroupVersion:
partial, err := asV1Beta1PartialObjectMetadata(result)
if err != nil {
scope.err(err, w, req)
return
}
if err := writeMetaInternalVersion(partial, statusCode, w, req, &scope, target.GroupVersion()); err != nil {
scope.err(err, w, req)
return
}
case target.Kind == "PartialObjectMetadataList" && target.GroupVersion() == metav1beta1.SchemeGroupVersion:
trace.Step("Processing list items")
partial, err := asV1Beta1PartialObjectMetadataList(result)
if err != nil {
scope.err(err, w, req)
return
}
if err := writeMetaInternalVersion(partial, statusCode, w, req, &scope, target.GroupVersion()); err != nil {
scope.err(err, w, req)
return
}
case target.Kind == "Table" && target.GroupVersion() == metav1beta1.SchemeGroupVersion:
opts := &metav1beta1.TableOptions{}
trace.Step("Decoding parameters")
if err := metav1beta1.ParameterCodec.DecodeParameters(req.URL.Query(), metav1beta1.SchemeGroupVersion, opts); err != nil {
scope.err(err, w, req)
return
}
table, err := asV1Beta1Table(ctx, result, opts, scope)
if err != nil {
scope.err(err, w, req)
return
}
if err := writeMetaInternalVersion(table, statusCode, w, req, &scope, target.GroupVersion()); err != nil {
scope.err(err, w, req)
return
}
default:
// this block should only be hit if scope AllowsConversion is incorrect
accepted, _ := negotiation.MediaTypesForSerializer(metainternalversion.Codecs)
err := negotiation.NewNotAcceptableError(accepted)
obj, err := transformObject(ctx, result, options, mediaType, scope, req)
if err != nil {
scope.err(err, w, req)
return
}
kind, serializer, _ := targetEncodingForTransform(&scope, mediaType, req)
responsewriters.WriteObjectNegotiated(serializer, &scope, kind.GroupVersion(), w, req, statusCode, obj)
}
// errNotAcceptable indicates Accept negotiation has failed
@ -131,15 +142,11 @@ func (e errNotAcceptable) Status() metav1.Status {
}
func asV1Beta1Table(ctx context.Context, result runtime.Object, opts *metav1beta1.TableOptions, scope RequestScope) (runtime.Object, error) {
trace := scope.Trace
trace.Step("Converting to table")
table, err := scope.TableConvertor.ConvertToTable(ctx, result, opts)
if err != nil {
return nil, err
}
trace.Step("Processing rows")
for i := range table.Rows {
item := &table.Rows[i]
switch opts.IncludeObject {
@ -161,7 +168,6 @@ func asV1Beta1Table(ctx context.Context, result runtime.Object, opts *metav1beta
case metav1beta1.IncludeNone:
item.Object.Object = nil
default:
// TODO: move this to validation on the table options?
err = errors.NewBadRequest(fmt.Sprintf("unrecognized includeObject value: %q", opts.IncludeObject))
return nil, err
}
@ -172,7 +178,6 @@ func asV1Beta1Table(ctx context.Context, result runtime.Object, opts *metav1beta
func asV1Beta1PartialObjectMetadata(result runtime.Object) (runtime.Object, error) {
if meta.IsListType(result) {
// TODO: this should be calculated earlier
err := newNotAcceptableError(fmt.Sprintf("you requested PartialObjectMetadata, but the requested object is a list (%T)", result))
return nil, err
}
@ -187,7 +192,6 @@ func asV1Beta1PartialObjectMetadata(result runtime.Object) (runtime.Object, erro
func asV1Beta1PartialObjectMetadataList(result runtime.Object) (runtime.Object, error) {
if !meta.IsListType(result) {
// TODO: this should be calculated earlier
return nil, newNotAcceptableError(fmt.Sprintf("you requested PartialObjectMetadataList, but the requested object is not a list (%T)", result))
}
list := &metav1beta1.PartialObjectMetadataList{}
@ -206,14 +210,3 @@ func asV1Beta1PartialObjectMetadataList(result runtime.Object) (runtime.Object,
}
return list, nil
}
func writeMetaInternalVersion(obj runtime.Object, statusCode int, w http.ResponseWriter, req *http.Request, restrictions negotiation.EndpointRestrictions, target schema.GroupVersion) error {
// renegotiate under the internal version
_, info, err := negotiation.NegotiateOutputMediaType(req, metainternalversion.Codecs, restrictions)
if err != nil {
return err
}
encoder := metainternalversion.Codecs.EncoderForVersion(info.Serializer, target)
responsewriters.SerializeObject(info.MediaType, encoder, w, req, statusCode, obj)
return nil
}

View File

@ -55,23 +55,6 @@ func (w httpResponseWriterWithInit) Write(b []byte) (n int, err error) {
return w.innerW.Write(b)
}
// WriteObject renders a returned runtime.Object to the response as a stream or an encoded object. If the object
// returned by the response implements rest.ResourceStreamer that interface will be used to render the
// response. The Accept header and current API version will be passed in, and the output will be copied
// directly to the response body. If content type is returned it is used, otherwise the content type will
// be "application/octet-stream". All other objects are sent to standard JSON serialization.
func WriteObject(statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSerializer, object runtime.Object, w http.ResponseWriter, req *http.Request) {
stream, ok := object.(rest.ResourceStreamer)
if ok {
requestInfo, _ := request.RequestInfoFrom(req.Context())
metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
StreamObject(statusCode, gv, s, stream, w, req)
})
return
}
WriteObjectNegotiated(s, gv, w, req, statusCode, object)
}
// StreamObject performs input stream negotiation from a ResourceStreamer and writes that to the response.
// If the client requests a websocket upgrade, negotiate for a websocket reader protocol (because many
// browser clients cannot easily handle binary streaming protocols).
@ -123,9 +106,17 @@ func SerializeObject(mediaType string, encoder runtime.Encoder, innerW http.Resp
}
// WriteObjectNegotiated renders an object in the content type negotiated by the client.
// The context is optional and can be nil.
func WriteObjectNegotiated(s runtime.NegotiatedSerializer, gv schema.GroupVersion, w http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) {
serializer, err := negotiation.NegotiateOutputSerializer(req, s)
func WriteObjectNegotiated(s runtime.NegotiatedSerializer, restrictions negotiation.EndpointRestrictions, gv schema.GroupVersion, w http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) {
stream, ok := object.(rest.ResourceStreamer)
if ok {
requestInfo, _ := request.RequestInfoFrom(req.Context())
metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
StreamObject(statusCode, gv, s, stream, w, req)
})
return
}
_, serializer, err := negotiation.NegotiateOutputMediaType(req, s, restrictions)
if err != nil {
// if original statusCode was not successful we need to return the original error
// we cannot hide it behind negotiation problems
@ -162,7 +153,7 @@ func ErrorNegotiated(err error, s runtime.NegotiatedSerializer, gv schema.GroupV
return code
}
WriteObjectNegotiated(s, gv, w, req, code, status)
WriteObjectNegotiated(s, negotiation.DefaultEndpointRestrictions, gv, w, req, code, status)
return code
}

View File

@ -79,12 +79,14 @@ func (scope *RequestScope) err(err error, w http.ResponseWriter, req *http.Reque
responsewriters.ErrorNegotiated(err, scope.Serializer, scope.Kind.GroupVersion(), w, req)
}
func (scope *RequestScope) AllowsConversion(gvk schema.GroupVersionKind) bool {
func (scope *RequestScope) AllowsConversion(gvk schema.GroupVersionKind, mimeType, mimeSubType string) bool {
// TODO: this is temporary, replace with an abstraction calculated at endpoint installation time
if gvk.GroupVersion() == metav1beta1.SchemeGroupVersion {
switch gvk.Kind {
case "Table":
return scope.TableConvertor != nil
return scope.TableConvertor != nil &&
mimeType == "application" &&
(mimeSubType == "json" || mimeSubType == "yaml")
case "PartialObjectMetadata", "PartialObjectMetadataList":
// TODO: should delineate between lists and non-list endpoints
return true
@ -172,7 +174,7 @@ type responder struct {
}
func (r *responder) Object(statusCode int, obj runtime.Object) {
responsewriters.WriteObject(statusCode, r.scope.Kind.GroupVersion(), r.scope.Serializer, obj, r.w, r.req)
responsewriters.WriteObjectNegotiated(r.scope.Serializer, &r.scope, r.scope.Kind.GroupVersion(), r.w, r.req, statusCode, obj)
}
func (r *responder) Error(err error) {

View File

@ -25,13 +25,13 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/server/httplog"
"k8s.io/apiserver/pkg/util/wsstream"
@ -61,42 +61,56 @@ func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) {
return t.C, t.Stop
}
// serveWatch handles serving requests to the server
// serveWatch will serve a watch response.
// TODO: the functionality in this method and in WatchServer.Serve is not cleanly decoupled.
func serveWatch(watcher watch.Interface, scope RequestScope, req *http.Request, w http.ResponseWriter, timeout time.Duration) {
// negotiate for the stream serializer
serializer, err := negotiation.NegotiateOutputStreamSerializer(req, scope.Serializer)
func serveWatch(watcher watch.Interface, scope RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration) {
options, err := optionsForTransform(mediaTypeOptions, req)
if err != nil {
scope.err(err, w, req)
return
}
// negotiate for the stream serializer from the scope's serializer
serializer, err := negotiation.NegotiateOutputMediaTypeStream(req, scope.Serializer, &scope)
if err != nil {
scope.err(err, w, req)
return
}
framer := serializer.StreamSerializer.Framer
streamSerializer := serializer.StreamSerializer.Serializer
embedded := serializer.Serializer
encoder := scope.Serializer.EncoderForVersion(streamSerializer, scope.Kind.GroupVersion())
useTextFraming := serializer.EncodesAsText
if framer == nil {
scope.err(fmt.Errorf("no framer defined for %q available for embedded encoding", serializer.MediaType), w, req)
return
}
encoder := scope.Serializer.EncoderForVersion(streamSerializer, scope.Kind.GroupVersion())
useTextFraming := serializer.EncodesAsText
// find the embedded serializer matching the media type
embeddedEncoder := scope.Serializer.EncoderForVersion(embedded, scope.Kind.GroupVersion())
// TODO: next step, get back mediaTypeOptions from negotiate and return the exact value here
mediaType := serializer.MediaType
if mediaType != runtime.ContentTypeJSON {
mediaType += ";stream=watch"
}
ctx := req.Context()
requestInfo, ok := request.RequestInfoFrom(ctx)
if !ok {
scope.err(fmt.Errorf("missing requestInfo"), w, req)
return
// locate the appropriate embedded encoder based on the transform
var embeddedEncoder runtime.Encoder
contentKind, contentSerializer, transform := targetEncodingForTransform(&scope, mediaTypeOptions, req)
if transform {
var embedded runtime.Serializer
for _, supported := range contentSerializer.SupportedMediaTypes() {
if supported.MediaType == serializer.MediaType {
embedded = supported.Serializer
}
}
if embedded == nil {
scope.err(fmt.Errorf("no encoder for %q exists in the requested target %#v", serializer.MediaType, contentSerializer), w, req)
return
}
embeddedEncoder = contentSerializer.EncoderForVersion(embedded, contentKind.GroupVersion())
} else {
embeddedEncoder = scope.Serializer.EncoderForVersion(serializer.Serializer, contentKind.GroupVersion())
}
ctx := req.Context()
server := &WatchServer{
Watching: watcher,
Scope: scope,
@ -106,10 +120,20 @@ func serveWatch(watcher watch.Interface, scope RequestScope, req *http.Request,
Framer: framer,
Encoder: encoder,
EmbeddedEncoder: embeddedEncoder,
Fixup: func(obj runtime.Object) {
if err := setSelfLink(obj, requestInfo, scope.Namer); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to set link for object %v: %v", reflect.TypeOf(obj), err))
Fixup: func(obj runtime.Object) runtime.Object {
result, err := transformObject(ctx, obj, options, mediaTypeOptions, scope, req)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to transform object %v: %v", reflect.TypeOf(obj), err))
return obj
}
// When we are transformed to a table, use the table options as the state for whether we
// should print headers - on watch, we only want to print table headers on the first object
// and omit them on subsequent events.
if tableOptions, ok := options.(*metav1beta1.TableOptions); ok {
tableOptions.NoHeaders = true
}
return result
},
TimeoutFactory: &realTimeoutFactory{timeout},
@ -133,7 +157,8 @@ type WatchServer struct {
Encoder runtime.Encoder
// used to encode the nested object in the watch stream
EmbeddedEncoder runtime.Encoder
Fixup func(runtime.Object)
// used to correct the object before we send it to the serializer
Fixup func(runtime.Object) runtime.Object
TimeoutFactory TimeoutFactory
}
@ -205,8 +230,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return
}
obj := event.Object
s.Fixup(obj)
obj := s.Fixup(event.Object)
if err := s.EmbeddedEncoder.Encode(obj, buf); err != nil {
// unexpected error
utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", obj, err))
@ -272,8 +296,7 @@ func (s *WatchServer) HandleWS(ws *websocket.Conn) {
// End of results.
return
}
obj := event.Object
s.Fixup(obj)
obj := s.Fixup(event.Object)
if err := s.EmbeddedEncoder.Encode(obj, buf); err != nil {
// unexpected error
utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", obj, err))

View File

@ -586,7 +586,7 @@ func TestWatchHTTPErrors(t *testing.T) {
Encoder: newCodec,
EmbeddedEncoder: newCodec,
Fixup: func(obj runtime.Object) {},
Fixup: func(obj runtime.Object) runtime.Object { return obj },
TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done},
}
@ -646,7 +646,7 @@ func TestWatchHTTPDynamicClientErrors(t *testing.T) {
Encoder: newCodec,
EmbeddedEncoder: newCodec,
Fixup: func(obj runtime.Object) {},
Fixup: func(obj runtime.Object) runtime.Object { return obj },
TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done},
}
@ -708,7 +708,7 @@ func TestWatchHTTPTimeout(t *testing.T) {
Encoder: newCodec,
EmbeddedEncoder: newCodec,
Fixup: func(obj runtime.Object) {},
Fixup: func(obj runtime.Object) runtime.Object { return obj },
TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done},
}

View File

@ -18,6 +18,7 @@ package resttest
import (
"context"
"encoding/json"
"fmt"
"reflect"
"strings"
@ -1460,7 +1461,7 @@ func (t *Tester) testListTableConversion(obj runtime.Object, assignFn AssignFunc
}
columns := table.ColumnDefinitions
if len(columns) == 0 {
t.Errorf("unexpected number of columns: %v", len(columns))
t.Fatalf("unexpected number of columns: %v\n%#v", len(columns), columns)
}
if !strings.EqualFold(columns[0].Name, "Name") || columns[0].Type != "string" || columns[0].Format != "name" {
t.Errorf("expect column 0 to be the name column: %#v", columns[0])
@ -1505,8 +1506,11 @@ func (t *Tester) testListTableConversion(obj runtime.Object, assignFn AssignFunc
}
}
if len(row.Cells) != len(table.ColumnDefinitions) {
t.Fatalf("unmatched row length on row %d: %#v", i, row.Cells)
}
}
data, _ := json.MarshalIndent(table, "", " ")
t.Logf("%s", string(data))
}
// =============================================================================

View File

@ -73,9 +73,11 @@ func (c defaultTableConvertor) ConvertToTable(ctx context.Context, object runtim
table.SelfLink = m.GetSelfLink()
}
}
table.ColumnDefinitions = []metav1beta1.TableColumnDefinition{
{Name: "Name", Type: "string", Format: "name", Description: swaggerMetadataDescriptions["name"]},
{Name: "Created At", Type: "date", Description: swaggerMetadataDescriptions["creationTimestamp"]},
if opt, ok := tableOptions.(*metav1beta1.TableOptions); !ok || !opt.NoHeaders {
table.ColumnDefinitions = []metav1beta1.TableColumnDefinition{
{Name: "Name", Type: "string", Format: "name", Description: swaggerMetadataDescriptions["name"]},
{Name: "Created At", Type: "date", Description: swaggerMetadataDescriptions["creationTimestamp"]},
}
}
return &table, nil
}