apiserver/handlers/watch: encode initialEventsListBlueprint with watchEncoder (#127587)

* apiserver/handlers/get: construct versionedList

* storage/cacher: document caching the serialization of bookmark events

* endpoints/handlers/response: add watchListTransformer

* endpoints/handlers/watch: wire watchListTransformer

Kubernetes-commit: fbf1a0dc181ccbeb9925ad9c284d913a25c16562
This commit is contained in:
Lukasz Szaszkiewicz 2024-10-01 11:27:49 +02:00 committed by Kubernetes Publisher
parent d7cbe7a5f2
commit 36e57697d1
7 changed files with 268 additions and 25 deletions

4
go.mod
View File

@ -49,9 +49,9 @@ require (
gopkg.in/evanphx/json-patch.v4 v4.12.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
gopkg.in/square/go-jose.v2 v2.6.0
k8s.io/api v0.0.0-20240920202009-71385f038c10
k8s.io/api v0.0.0-20241001061456-5c8e0b961397
k8s.io/apimachinery v0.0.0-20240929035808-0db5dbf03048
k8s.io/client-go v0.0.0-20240929082523-46093399c4de
k8s.io/client-go v0.0.0-20241001063125-415a0d6789ae
k8s.io/component-base v0.0.0-20240928083227-66de10e147bc
k8s.io/klog/v2 v2.130.1
k8s.io/kms v0.0.0-20240912041232-273c893e4e51

8
go.sum
View File

@ -369,12 +369,12 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
k8s.io/api v0.0.0-20240920202009-71385f038c10 h1:shjQe98Co9zBlDzQkxb5IJEWtReSl7qunr56C4Jgc70=
k8s.io/api v0.0.0-20240920202009-71385f038c10/go.mod h1:KCEt6+W/Yn1Vc48pYXeLf0mGK52kJhvt+rcaUVsIaKQ=
k8s.io/api v0.0.0-20241001061456-5c8e0b961397 h1:nAgFHPXbGIAyhn+LgjFlp3/MpOW8m7TQyO933fTEuH8=
k8s.io/api v0.0.0-20241001061456-5c8e0b961397/go.mod h1:P4IhnNCi/5FdQLo54CpHGU7xr10vOGPd2loe31c00vM=
k8s.io/apimachinery v0.0.0-20240929035808-0db5dbf03048 h1:EWQWppfphUSBwuhuNA4weJ9vJtfHhjVwizTYjZb8ikw=
k8s.io/apimachinery v0.0.0-20240929035808-0db5dbf03048/go.mod h1:5rKPDwwN9qm//xASFCZ83nyYEanHxxhc7pZ8AC4lukY=
k8s.io/client-go v0.0.0-20240929082523-46093399c4de h1:a/AoWBaxo6kjvY29p3PUxc7PDUfyTHQCBb3zoA7L7gw=
k8s.io/client-go v0.0.0-20240929082523-46093399c4de/go.mod h1:rV8uxwGX3sOvz0lP3/B/bLL2+dtQ7QmA4iU+d7u8snk=
k8s.io/client-go v0.0.0-20241001063125-415a0d6789ae h1:ZvG63lxf7qW/7fG6dTliZpWIu3WEuGCbmm2YnH+S1+E=
k8s.io/client-go v0.0.0-20241001063125-415a0d6789ae/go.mod h1:04jylstcnn30FeXy+rr1z1zFxB8wTIc/dW05SM02mgU=
k8s.io/component-base v0.0.0-20240928083227-66de10e147bc h1:rcpUSxWdWOVlC4auITDsLVD8mkDwKoO5ngGUEW1GARg=
k8s.io/component-base v0.0.0-20240928083227-66de10e147bc/go.mod h1:y3+nKeFC4lXl3nEHGAXUtoRopMfxd7um7GOQzZWcdi8=
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=

View File

@ -265,6 +265,16 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc
if timeout == 0 && minRequestTimeout > 0 {
timeout = time.Duration(float64(minRequestTimeout) * (rand.Float64() + 1.0))
}
var emptyVersionedList runtime.Object
if isListWatchRequest(opts) {
emptyVersionedList, err = scope.Convertor.ConvertToVersion(r.NewList(), scope.Kind.GroupVersion())
if err != nil {
scope.err(errors.NewInternalError(err), w, req)
return
}
}
klog.V(3).InfoS("Starting watch", "path", req.URL.Path, "resourceVersion", opts.ResourceVersion, "labels", opts.LabelSelector, "fields", opts.FieldSelector, "timeout", timeout)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer func() { cancel() }()
@ -273,7 +283,7 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc
scope.err(err, w, req)
return
}
handler, err := serveWatchHandler(watcher, scope, outputMediaType, req, w, timeout, metrics.CleanListScope(ctx, &opts))
handler, err := serveWatchHandler(watcher, scope, outputMediaType, req, w, timeout, metrics.CleanListScope(ctx, &opts), emptyVersionedList)
if err != nil {
scope.err(err, w, req)
return

View File

@ -18,6 +18,7 @@ package handlers
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
@ -38,8 +39,9 @@ import (
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/metrics"
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/storage"
klog "k8s.io/klog/v2"
"k8s.io/klog/v2"
)
// watchEmbeddedEncoder performs encoding of the embedded object.
@ -147,6 +149,8 @@ type watchEncoder struct {
encoder runtime.Encoder
framer io.Writer
watchListTransformerFn watchListTransformerFunction
buffer runtime.Splice
eventBuffer runtime.Splice
@ -154,15 +158,16 @@ type watchEncoder struct {
identifiers map[watch.EventType]runtime.Identifier
}
func newWatchEncoder(ctx context.Context, kind schema.GroupVersionKind, embeddedEncoder runtime.Encoder, encoder runtime.Encoder, framer io.Writer) *watchEncoder {
func newWatchEncoder(ctx context.Context, kind schema.GroupVersionKind, embeddedEncoder runtime.Encoder, encoder runtime.Encoder, framer io.Writer, watchListTransformerFn watchListTransformerFunction) *watchEncoder {
return &watchEncoder{
ctx: ctx,
kind: kind,
embeddedEncoder: embeddedEncoder,
encoder: encoder,
framer: framer,
buffer: runtime.NewSpliceBuffer(),
eventBuffer: runtime.NewSpliceBuffer(),
ctx: ctx,
kind: kind,
embeddedEncoder: embeddedEncoder,
encoder: encoder,
framer: framer,
watchListTransformerFn: watchListTransformerFn,
buffer: runtime.NewSpliceBuffer(),
eventBuffer: runtime.NewSpliceBuffer(),
}
}
@ -174,6 +179,12 @@ func (e *watchEncoder) Encode(event watch.Event) error {
encodeFunc := func(obj runtime.Object, w io.Writer) error {
return e.doEncode(obj, event, w)
}
if event.Type == watch.Bookmark {
// Bookmark objects are small, and we don't yet support serialization for them.
// Additionally, we need to additionally transform them to support watch-list feature
event = e.watchListTransformerFn(event)
return encodeFunc(event.Object, e.framer)
}
if co, ok := event.Object.(runtime.CacheableObject); ok {
return co.CacheEncode(e.identifier(event.Type), encodeFunc, e.framer)
}
@ -479,3 +490,94 @@ func asPartialObjectMetadataList(result runtime.Object, groupVersion schema.Grou
return nil, newNotAcceptableError(fmt.Sprintf("no PartialObjectMetadataList exists in group version %s", groupVersion))
}
}
// watchListTransformerFunction an optional function
// applied to watchlist bookmark events that transforms
// the embedded object before sending it to a client.
type watchListTransformerFunction func(watch.Event) watch.Event
// watchListTransformer performs transformation of
// a special watchList bookmark event.
//
// The bookmark is annotated with InitialEventsListBlueprintAnnotationKey
// and contains an empty, versioned list that we must encode in the requested format
// (e.g., protobuf, JSON, CBOR) and then store as a base64-encoded string.
type watchListTransformer struct {
initialEventsListBlueprint runtime.Object
targetGVK *schema.GroupVersionKind
negotiatedEncoder runtime.Encoder
buffer runtime.Splice
}
// createWatchListTransformerIfRequested returns a transformer function for watchlist bookmark event.
func newWatchListTransformer(initialEventsListBlueprint runtime.Object, targetGVK *schema.GroupVersionKind, negotiatedEncoder runtime.Encoder) *watchListTransformer {
return &watchListTransformer{
initialEventsListBlueprint: initialEventsListBlueprint,
targetGVK: targetGVK,
negotiatedEncoder: negotiatedEncoder,
buffer: runtime.NewSpliceBuffer(),
}
}
func (e *watchListTransformer) transform(event watch.Event) watch.Event {
if e.initialEventsListBlueprint == nil {
return event
}
hasAnnotation, err := storage.HasInitialEventsEndBookmarkAnnotation(event.Object)
if err != nil {
return newWatchEventErrorFor(err)
}
if !hasAnnotation {
return event
}
if err = e.encodeInitialEventsListBlueprint(event.Object); err != nil {
return newWatchEventErrorFor(err)
}
return event
}
func (e *watchListTransformer) encodeInitialEventsListBlueprint(object runtime.Object) error {
initialEventsListBlueprint, err := e.transformInitialEventsListBlueprint()
if err != nil {
return err
}
defer e.buffer.Reset()
if err = e.negotiatedEncoder.Encode(initialEventsListBlueprint, e.buffer); err != nil {
return err
}
encodedInitialEventsListBlueprint := e.buffer.Bytes()
// the storage layer creates a deep copy of the obj before modifying it.
// since the object has the annotation, we can modify it directly.
objectMeta, err := meta.Accessor(object)
if err != nil {
return err
}
annotations := objectMeta.GetAnnotations()
annotations[metav1.InitialEventsListBlueprintAnnotationKey] = base64.StdEncoding.EncodeToString(encodedInitialEventsListBlueprint)
objectMeta.SetAnnotations(annotations)
return nil
}
func (e *watchListTransformer) transformInitialEventsListBlueprint() (runtime.Object, error) {
if e.targetGVK != nil && e.targetGVK.Kind == "PartialObjectMetadata" {
return asPartialObjectMetadataList(e.initialEventsListBlueprint, e.targetGVK.GroupVersion())
}
return e.initialEventsListBlueprint, nil
}
func newWatchEventErrorFor(err error) watch.Event {
return watch.Event{
Type: watch.Error,
Object: &metav1.Status{
Status: metav1.StatusFailure,
Message: err.Error(),
Reason: metav1.StatusReasonInternalError,
Code: http.StatusInternalServerError,
},
}
}

View File

@ -17,7 +17,9 @@ limitations under the License.
package handlers
import (
"bytes"
"context"
"encoding/base64"
"fmt"
"io"
"net/http"
@ -25,13 +27,18 @@ import (
"testing"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
runtimejson "k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/apimachinery/pkg/watch"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
)
var _ runtime.CacheableObject = &mockCacheableObject{}
@ -222,3 +229,118 @@ func TestWatchEncoderIdentifier(t *testing.T) {
t.Error(" - watchEncoder::typeIdentifier to capture all relevant fields in identifier")
}
}
func TestWatchListEncoder(t *testing.T) {
makePartialObjectMetadataListWithoutKind := func(rv string) *metav1.PartialObjectMetadataList {
return &metav1.PartialObjectMetadataList{
// do not set the type info to match
// newWatchListTransformer
ListMeta: metav1.ListMeta{ResourceVersion: rv},
}
}
makePodListWithKind := func(rv string) *v1.PodList {
return &v1.PodList{
TypeMeta: metav1.TypeMeta{
// set the type info so
// that it differs from
// PartialObjectMetadataList
Kind: "PodList",
},
ListMeta: metav1.ListMeta{
ResourceVersion: rv,
},
}
}
makeBookmarkEventFor := func(pod *v1.Pod) watch.Event {
return watch.Event{
Type: watch.Bookmark,
Object: pod,
}
}
makePod := func(name string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "ns",
Annotations: map[string]string{},
},
}
}
makePodWithInitialEventsAnnotation := func(name string) *v1.Pod {
p := makePod(name)
p.Annotations[metav1.InitialEventsAnnotationKey] = "true"
return p
}
scenarios := []struct {
name string
negotiatedEncoder runtime.Serializer
targetGVK *schema.GroupVersionKind
actualEvent watch.Event
listBlueprint runtime.Object
expectedBase64ListBlueprint string
}{
{
name: "pass through, an obj without the annotation received",
actualEvent: makeBookmarkEventFor(makePod("1")),
negotiatedEncoder: newJSONSerializer(),
},
{
name: "encodes the initialEventsListBlueprint if an obj with the annotation is passed",
actualEvent: makeBookmarkEventFor(makePodWithInitialEventsAnnotation("1")),
listBlueprint: makePodListWithKind("100"),
expectedBase64ListBlueprint: encodeObjectToBase64String(makePodListWithKind("100"), t),
negotiatedEncoder: newJSONSerializer(),
},
{
name: "encodes the initialEventsListBlueprint as PartialObjectMetadata when requested",
targetGVK: &schema.GroupVersionKind{Group: "meta.k8s.io", Version: "v1", Kind: "PartialObjectMetadata"},
actualEvent: makeBookmarkEventFor(makePodWithInitialEventsAnnotation("2")),
listBlueprint: makePodListWithKind("101"),
expectedBase64ListBlueprint: encodeObjectToBase64String(makePartialObjectMetadataListWithoutKind("101"), t),
negotiatedEncoder: newJSONSerializer(),
},
}
for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
target := newWatchListTransformer(scenario.listBlueprint, scenario.targetGVK, scenario.negotiatedEncoder)
transformedEvent := target.transform(scenario.actualEvent)
actualObjectMeta, err := meta.Accessor(transformedEvent.Object)
if err != nil {
t.Fatal(err)
}
base64ListBlueprint, ok := actualObjectMeta.GetAnnotations()[metav1.InitialEventsListBlueprintAnnotationKey]
if !ok && len(scenario.expectedBase64ListBlueprint) != 0 {
t.Fatalf("the encoded obj doesn't have %q", metav1.InitialEventsListBlueprintAnnotationKey)
}
if base64ListBlueprint != scenario.expectedBase64ListBlueprint {
t.Fatalf("unexpected base64ListBlueprint = %s, expected = %s", base64ListBlueprint, scenario.expectedBase64ListBlueprint)
}
})
}
}
func encodeObjectToBase64String(obj runtime.Object, t *testing.T) string {
e := newJSONSerializer()
var buf bytes.Buffer
err := e.Encode(obj, &buf)
if err != nil {
t.Fatal(err)
}
return base64.StdEncoding.EncodeToString(buf.Bytes())
}
func newJSONSerializer() runtime.Serializer {
return runtimejson.NewSerializerWithOptions(
runtimejson.DefaultMetaFactory,
clientgoscheme.Scheme,
clientgoscheme.Scheme,
runtimejson.SerializerOptions{},
)
}

View File

@ -64,7 +64,7 @@ func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) {
// serveWatchHandler returns a handle to serve a watch response.
// TODO: the functionality in this method and in WatchServer.Serve is not cleanly decoupled.
func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration, metricsScope string) (http.Handler, error) {
func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration, metricsScope string, initialEventsListBlueprint runtime.Object) (http.Handler, error) {
options, err := optionsForTransform(mediaTypeOptions, req)
if err != nil {
return nil, err
@ -91,25 +91,25 @@ func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOp
ctx := req.Context()
// locate the appropriate embedded encoder based on the transform
var embeddedEncoder runtime.Encoder
var negotiatedEncoder runtime.Encoder
contentKind, contentSerializer, transform := targetEncodingForTransform(scope, mediaTypeOptions, req)
if transform {
info, ok := runtime.SerializerInfoForMediaType(contentSerializer.SupportedMediaTypes(), serializer.MediaType)
if !ok {
return nil, fmt.Errorf("no encoder for %q exists in the requested target %#v", serializer.MediaType, contentSerializer)
}
embeddedEncoder = contentSerializer.EncoderForVersion(info.Serializer, contentKind.GroupVersion())
negotiatedEncoder = contentSerializer.EncoderForVersion(info.Serializer, contentKind.GroupVersion())
} else {
embeddedEncoder = scope.Serializer.EncoderForVersion(serializer.Serializer, contentKind.GroupVersion())
negotiatedEncoder = scope.Serializer.EncoderForVersion(serializer.Serializer, contentKind.GroupVersion())
}
var memoryAllocator runtime.MemoryAllocator
if encoderWithAllocator, supportsAllocator := embeddedEncoder.(runtime.EncoderWithAllocator); supportsAllocator {
if encoderWithAllocator, supportsAllocator := negotiatedEncoder.(runtime.EncoderWithAllocator); supportsAllocator {
// don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call.
// instead, we allocate the buffer for the entire watch session and release it when we close the connection.
memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator)
embeddedEncoder = runtime.NewEncoderWithAllocator(encoderWithAllocator, memoryAllocator)
negotiatedEncoder = runtime.NewEncoderWithAllocator(encoderWithAllocator, memoryAllocator)
}
var tableOptions *metav1.TableOptions
if options != nil {
@ -119,7 +119,7 @@ func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOp
return nil, fmt.Errorf("unexpected options type: %T", options)
}
}
embeddedEncoder = newWatchEmbeddedEncoder(ctx, embeddedEncoder, mediaTypeOptions.Convert, tableOptions, scope)
embeddedEncoder := newWatchEmbeddedEncoder(ctx, negotiatedEncoder, mediaTypeOptions.Convert, tableOptions, scope)
if encoderWithAllocator, supportsAllocator := encoder.(runtime.EncoderWithAllocator); supportsAllocator {
if memoryAllocator == nil {
@ -145,6 +145,8 @@ func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOp
Encoder: encoder,
EmbeddedEncoder: embeddedEncoder,
watchListTransformerFn: newWatchListTransformer(initialEventsListBlueprint, mediaTypeOptions.Convert, negotiatedEncoder).transform,
MemoryAllocator: memoryAllocator,
TimeoutFactory: &realTimeoutFactory{timeout},
ServerShuttingDownCh: serverShuttingDownCh,
@ -174,6 +176,10 @@ type WatchServer struct {
Encoder runtime.Encoder
// used to encode the nested object in the watch stream
EmbeddedEncoder runtime.Encoder
// watchListTransformerFn a function applied
// to watchlist bookmark events that transforms
// the embedded object before sending it to a client.
watchListTransformerFn watchListTransformerFunction
MemoryAllocator runtime.MemoryAllocator
TimeoutFactory TimeoutFactory
@ -219,7 +225,7 @@ func (s *WatchServer) HandleHTTP(w http.ResponseWriter, req *http.Request) {
flusher.Flush()
kind := s.Scope.Kind
watchEncoder := newWatchEncoder(req.Context(), kind, s.EmbeddedEncoder, s.Encoder, framer)
watchEncoder := newWatchEncoder(req.Context(), kind, s.EmbeddedEncoder, s.Encoder, framer, s.watchListTransformerFn)
ch := s.Watching.ResultChan()
done := req.Context().Done()
@ -288,7 +294,7 @@ func (s *WatchServer) HandleWS(ws *websocket.Conn) {
framer := newWebsocketFramer(ws, s.UseTextFraming)
kind := s.Scope.Kind
watchEncoder := newWatchEncoder(context.TODO(), kind, s.EmbeddedEncoder, s.Encoder, framer)
watchEncoder := newWatchEncoder(context.TODO(), kind, s.EmbeddedEncoder, s.Encoder, framer, s.watchListTransformerFn)
ch := s.Watching.ResultChan()
for {

View File

@ -1130,6 +1130,9 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
// Since add() can block, we explicitly add when cacher is unlocked.
// Dispatching event in nonblocking way first, which make faster watchers
// not be blocked by slower ones.
//
// Note: if we ever decide to cache the serialization of bookmark events,
// we will also need to modify the watchEncoder encoder
if event.Type == watch.Bookmark {
for _, watcher := range c.watchersBuffer {
watcher.nonblockingAdd(event)