Merge pull request #120300 from wojtek-t/refactor_streaming_watch_encoder
Refactor streaming watch encoder to enable caching Kubernetes-commit: 2a4d5c5fd52492ceac500555579a28701d1092ce
This commit is contained in:
commit
96ed0730bb
|
@ -33,8 +33,10 @@ import (
|
|||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
|
||||
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
||||
"k8s.io/apiserver/pkg/endpoints/metrics"
|
||||
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
|
||||
klog "k8s.io/klog/v2"
|
||||
|
@ -135,6 +137,113 @@ func (e *watchEmbeddedEncoder) embeddedIdentifier() runtime.Identifier {
|
|||
return runtime.Identifier(result)
|
||||
}
|
||||
|
||||
// watchEncoder performs encoding of the watch events.
|
||||
//
|
||||
// NOTE: watchEncoder is NOT thread-safe.
|
||||
type watchEncoder struct {
|
||||
ctx context.Context
|
||||
kind schema.GroupVersionKind
|
||||
embeddedEncoder runtime.Encoder
|
||||
encoder runtime.Encoder
|
||||
framer io.Writer
|
||||
|
||||
buffer runtime.Splice
|
||||
eventBuffer runtime.Splice
|
||||
|
||||
currentEmbeddedIdentifier runtime.Identifier
|
||||
identifiers map[watch.EventType]runtime.Identifier
|
||||
}
|
||||
|
||||
func newWatchEncoder(ctx context.Context, kind schema.GroupVersionKind, embeddedEncoder runtime.Encoder, encoder runtime.Encoder, framer io.Writer) *watchEncoder {
|
||||
return &watchEncoder{
|
||||
ctx: ctx,
|
||||
kind: kind,
|
||||
embeddedEncoder: embeddedEncoder,
|
||||
encoder: encoder,
|
||||
framer: framer,
|
||||
buffer: runtime.NewSpliceBuffer(),
|
||||
eventBuffer: runtime.NewSpliceBuffer(),
|
||||
}
|
||||
}
|
||||
|
||||
// Encode encodes a given watch event.
|
||||
// NOTE: if events object is implementing the CacheableObject interface,
|
||||
//
|
||||
// the serialized version is cached in that object [not the event itself].
|
||||
func (e *watchEncoder) Encode(event watch.Event) error {
|
||||
encodeFunc := func(obj runtime.Object, w io.Writer) error {
|
||||
return e.doEncode(obj, event, w)
|
||||
}
|
||||
if co, ok := event.Object.(runtime.CacheableObject); ok {
|
||||
return co.CacheEncode(e.identifier(event.Type), encodeFunc, e.framer)
|
||||
}
|
||||
return encodeFunc(event.Object, e.framer)
|
||||
}
|
||||
|
||||
func (e *watchEncoder) doEncode(obj runtime.Object, event watch.Event, w io.Writer) error {
|
||||
defer e.buffer.Reset()
|
||||
|
||||
if err := e.embeddedEncoder.Encode(obj, e.buffer); err != nil {
|
||||
return fmt.Errorf("unable to encode watch object %T: %v", obj, err)
|
||||
}
|
||||
|
||||
// ContentType is not required here because we are defaulting to the serializer type.
|
||||
outEvent := &metav1.WatchEvent{
|
||||
Type: string(event.Type),
|
||||
Object: runtime.RawExtension{Raw: e.buffer.Bytes()},
|
||||
}
|
||||
metrics.WatchEventsSizes.WithContext(e.ctx).WithLabelValues(e.kind.Group, e.kind.Version, e.kind.Kind).Observe(float64(len(outEvent.Object.Raw)))
|
||||
|
||||
defer e.eventBuffer.Reset()
|
||||
if err := e.encoder.Encode(outEvent, e.eventBuffer); err != nil {
|
||||
return fmt.Errorf("unable to encode watch object %T: %v (%#v)", outEvent, err, e)
|
||||
}
|
||||
|
||||
_, err := w.Write(e.eventBuffer.Bytes())
|
||||
return err
|
||||
}
|
||||
|
||||
type watchEncoderIdentifier struct {
|
||||
Name string `json:"name,omitempty"`
|
||||
EmbeddedEncoder string `json:"embeddedEncoder,omitempty"`
|
||||
Encoder string `json:"encoder,omitempty"`
|
||||
EventType string `json:"eventType,omitempty"`
|
||||
}
|
||||
|
||||
func (e *watchEncoder) identifier(eventType watch.EventType) runtime.Identifier {
|
||||
// We need to take into account that in embeddedEncoder includes table
|
||||
// transformer, then its identifier is dynamic. As a result, whenever
|
||||
// the identifier of embeddedEncoder changes, we need to invalidate the
|
||||
// whole identifiers cache.
|
||||
// TODO(wojtek-t): Can we optimize it somehow?
|
||||
if e.currentEmbeddedIdentifier != e.embeddedEncoder.Identifier() {
|
||||
e.currentEmbeddedIdentifier = e.embeddedEncoder.Identifier()
|
||||
e.identifiers = map[watch.EventType]runtime.Identifier{}
|
||||
}
|
||||
if _, ok := e.identifiers[eventType]; !ok {
|
||||
e.identifiers[eventType] = e.typeIdentifier(eventType)
|
||||
}
|
||||
return e.identifiers[eventType]
|
||||
}
|
||||
|
||||
func (e *watchEncoder) typeIdentifier(eventType watch.EventType) runtime.Identifier {
|
||||
// The eventType is a non-standard pattern. This is coming from the fact
|
||||
// that we're effectively serializing the whole watch event, but storing
|
||||
// it in serializations of the Object within the watch event.
|
||||
identifier := watchEncoderIdentifier{
|
||||
Name: "watch",
|
||||
EmbeddedEncoder: string(e.embeddedEncoder.Identifier()),
|
||||
Encoder: string(e.encoder.Identifier()),
|
||||
EventType: string(eventType),
|
||||
}
|
||||
|
||||
result, err := json.Marshal(identifier)
|
||||
if err != nil {
|
||||
klog.Fatalf("Failed marshaling identifier for watchEncoder: %v", err)
|
||||
}
|
||||
return runtime.Identifier(result)
|
||||
}
|
||||
|
||||
// doTransformResponseObject is used for handling all requests, including watch.
|
||||
func doTransformObject(ctx context.Context, obj runtime.Object, opts interface{}, target *schema.GroupVersionKind, scope *RequestScope) (runtime.Object, error) {
|
||||
if _, ok := obj.(*metav1.Status); ok {
|
||||
|
|
|
@ -212,3 +212,13 @@ func TestAsPartialObjectMetadataList(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatchEncoderIdentifier(t *testing.T) {
|
||||
eventFields := reflect.VisibleFields(reflect.TypeOf(metav1.WatchEvent{}))
|
||||
if len(eventFields) != 2 {
|
||||
t.Error("New field was added to metav1.WatchEvent.")
|
||||
t.Error(" Ensure that the following places are updated accordingly:")
|
||||
t.Error(" - watchEncoder::doEncode method when creating outEvent")
|
||||
t.Error(" - watchEncoder::typeIdentifier to capture all relevant fields in identifier")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,7 +27,6 @@ import (
|
|||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
|
||||
"k8s.io/apimachinery/pkg/util/httpstream/wsstream"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
|
@ -213,9 +212,6 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
var e streaming.Encoder
|
||||
e = streaming.NewEncoder(framer, s.Encoder)
|
||||
|
||||
// ensure the connection times out
|
||||
timeoutCh, cleanup := s.TimeoutFactory.TimeoutCh()
|
||||
defer cleanup()
|
||||
|
@ -226,10 +222,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|||
w.WriteHeader(http.StatusOK)
|
||||
flusher.Flush()
|
||||
|
||||
var unknown runtime.Unknown
|
||||
internalEvent := &metav1.InternalEvent{}
|
||||
outEvent := &metav1.WatchEvent{}
|
||||
buf := runtime.NewSpliceBuffer()
|
||||
watchEncoder := newWatchEncoder(req.Context(), kind, s.EmbeddedEncoder, s.Encoder, framer)
|
||||
ch := s.Watching.ResultChan()
|
||||
done := req.Context().Done()
|
||||
|
||||
|
@ -256,43 +249,18 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|||
metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc()
|
||||
isWatchListLatencyRecordingRequired := shouldRecordWatchListLatency(event)
|
||||
|
||||
if err := s.EmbeddedEncoder.Encode(event.Object, buf); err != nil {
|
||||
// unexpected error
|
||||
utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", event.Object, err))
|
||||
return
|
||||
}
|
||||
|
||||
// ContentType is not required here because we are defaulting to the serializer
|
||||
// type
|
||||
unknown.Raw = buf.Bytes()
|
||||
event.Object = &unknown
|
||||
metrics.WatchEventsSizes.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Observe(float64(len(unknown.Raw)))
|
||||
|
||||
*outEvent = metav1.WatchEvent{}
|
||||
|
||||
// create the external type directly and encode it. Clients will only recognize the serialization we provide.
|
||||
// The internal event is being reused, not reallocated so its just a few extra assignments to do it this way
|
||||
// and we get the benefit of using conversion functions which already have to stay in sync
|
||||
*internalEvent = metav1.InternalEvent(event)
|
||||
err := metav1.Convert_v1_InternalEvent_To_v1_WatchEvent(internalEvent, outEvent, nil)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("unable to convert watch object: %v", err))
|
||||
// client disconnect.
|
||||
return
|
||||
}
|
||||
if err := e.Encode(outEvent); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v (%#v)", outEvent, err, e))
|
||||
if err := watchEncoder.Encode(event); err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
// client disconnect.
|
||||
return
|
||||
}
|
||||
|
||||
if len(ch) == 0 {
|
||||
flusher.Flush()
|
||||
}
|
||||
if isWatchListLatencyRecordingRequired {
|
||||
metrics.RecordWatchListLatency(req.Context(), s.Scope.Resource, s.metricsScope)
|
||||
}
|
||||
|
||||
buf.Reset()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue