Merge pull request #122036 from linxiulei/cleanup
Minor cleanup for handlers/watch Kubernetes-commit: 368dfe3a88d2227dd192c0160fbdf0b081950d29
This commit is contained in:
		
						commit
						78157c73d3
					
				|  | @ -265,9 +265,15 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc | |||
| 				scope.err(err, w, req) | ||||
| 				return | ||||
| 			} | ||||
| 			handler, err := serveWatchHandler(watcher, scope, outputMediaType, req, w, timeout, metrics.CleanListScope(ctx, &opts)) | ||||
| 			if err != nil { | ||||
| 				scope.err(err, w, req) | ||||
| 				return | ||||
| 			} | ||||
| 			requestInfo, _ := request.RequestInfoFrom(ctx) | ||||
| 			metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() { | ||||
| 				serveWatch(watcher, scope, outputMediaType, req, w, timeout, metrics.CleanListScope(ctx, &opts)) | ||||
| 				defer watcher.Stop() | ||||
| 				handler.ServeHTTP(w, req) | ||||
| 			}) | ||||
| 			return | ||||
| 		} | ||||
|  |  | |||
|  | @ -62,30 +62,25 @@ func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) { | |||
| 	return t.C, t.Stop | ||||
| } | ||||
| 
 | ||||
| // serveWatch will serve a watch response.
 | ||||
| // serveWatchHandler returns a handle to serve a watch response.
 | ||||
| // TODO: the functionality in this method and in WatchServer.Serve is not cleanly decoupled.
 | ||||
| func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration, metricsScope string) { | ||||
| 	defer watcher.Stop() | ||||
| 
 | ||||
| func serveWatchHandler(watcher watch.Interface, scope *RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration, metricsScope string) (http.Handler, error) { | ||||
| 	options, err := optionsForTransform(mediaTypeOptions, req) | ||||
| 	if err != nil { | ||||
| 		scope.err(err, w, req) | ||||
| 		return | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	// negotiate for the stream serializer from the scope's serializer
 | ||||
| 	serializer, err := negotiation.NegotiateOutputMediaTypeStream(req, scope.Serializer, scope) | ||||
| 	if err != nil { | ||||
| 		scope.err(err, w, req) | ||||
| 		return | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	framer := serializer.StreamSerializer.Framer | ||||
| 	streamSerializer := serializer.StreamSerializer.Serializer | ||||
| 	encoder := scope.Serializer.EncoderForVersion(streamSerializer, scope.Kind.GroupVersion()) | ||||
| 	useTextFraming := serializer.EncodesAsText | ||||
| 	if framer == nil { | ||||
| 		scope.err(fmt.Errorf("no framer defined for %q available for embedded encoding", serializer.MediaType), w, req) | ||||
| 		return | ||||
| 		return nil, fmt.Errorf("no framer defined for %q available for embedded encoding", serializer.MediaType) | ||||
| 	} | ||||
| 	// TODO: next step, get back mediaTypeOptions from negotiate and return the exact value here
 | ||||
| 	mediaType := serializer.MediaType | ||||
|  | @ -101,8 +96,7 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n | |||
| 	if transform { | ||||
| 		info, ok := runtime.SerializerInfoForMediaType(contentSerializer.SupportedMediaTypes(), serializer.MediaType) | ||||
| 		if !ok { | ||||
| 			scope.err(fmt.Errorf("no encoder for %q exists in the requested target %#v", serializer.MediaType, contentSerializer), w, req) | ||||
| 			return | ||||
| 			return nil, fmt.Errorf("no encoder for %q exists in the requested target %#v", serializer.MediaType, contentSerializer) | ||||
| 		} | ||||
| 		embeddedEncoder = contentSerializer.EncoderForVersion(info.Serializer, contentKind.GroupVersion()) | ||||
| 	} else { | ||||
|  | @ -115,7 +109,6 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n | |||
| 		// don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call.
 | ||||
| 		// instead, we allocate the buffer for the entire watch session and release it when we close the connection.
 | ||||
| 		memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator) | ||||
| 		defer runtime.AllocatorPool.Put(memoryAllocator) | ||||
| 		embeddedEncoder = runtime.NewEncoderWithAllocator(encoderWithAllocator, memoryAllocator) | ||||
| 	} | ||||
| 	var tableOptions *metav1.TableOptions | ||||
|  | @ -123,8 +116,7 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n | |||
| 		if passedOptions, ok := options.(*metav1.TableOptions); ok { | ||||
| 			tableOptions = passedOptions | ||||
| 		} else { | ||||
| 			scope.err(fmt.Errorf("unexpected options type: %T", options), w, req) | ||||
| 			return | ||||
| 			return nil, fmt.Errorf("unexpected options type: %T", options) | ||||
| 		} | ||||
| 	} | ||||
| 	embeddedEncoder = newWatchEmbeddedEncoder(ctx, embeddedEncoder, mediaTypeOptions.Convert, tableOptions, scope) | ||||
|  | @ -134,7 +126,6 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n | |||
| 			// don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call.
 | ||||
| 			// instead, we allocate the buffer for the entire watch session and release it when we close the connection.
 | ||||
| 			memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator) | ||||
| 			defer runtime.AllocatorPool.Put(memoryAllocator) | ||||
| 		} | ||||
| 		encoder = runtime.NewEncoderWithAllocator(encoderWithAllocator, memoryAllocator) | ||||
| 	} | ||||
|  | @ -154,6 +145,7 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n | |||
| 		Encoder:         encoder, | ||||
| 		EmbeddedEncoder: embeddedEncoder, | ||||
| 
 | ||||
| 		MemoryAllocator:      memoryAllocator, | ||||
| 		TimeoutFactory:       &realTimeoutFactory{timeout}, | ||||
| 		ServerShuttingDownCh: serverShuttingDownCh, | ||||
| 
 | ||||
|  | @ -162,10 +154,9 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n | |||
| 
 | ||||
| 	if wsstream.IsWebSocketRequest(req) { | ||||
| 		w.Header().Set("Content-Type", server.MediaType) | ||||
| 		websocket.Handler(server.HandleWS).ServeHTTP(w, req) | ||||
| 		return | ||||
| 		return websocket.Handler(server.HandleWS), nil | ||||
| 	} | ||||
| 	server.HandleHTTP(w, req) | ||||
| 	return http.HandlerFunc(server.HandleHTTP), nil | ||||
| } | ||||
| 
 | ||||
| // WatchServer serves a watch.Interface over a websocket or vanilla HTTP.
 | ||||
|  | @ -184,6 +175,7 @@ type WatchServer struct { | |||
| 	// used to encode the nested object in the watch stream
 | ||||
| 	EmbeddedEncoder runtime.Encoder | ||||
| 
 | ||||
| 	MemoryAllocator      runtime.MemoryAllocator | ||||
| 	TimeoutFactory       TimeoutFactory | ||||
| 	ServerShuttingDownCh <-chan struct{} | ||||
| 
 | ||||
|  | @ -193,6 +185,12 @@ type WatchServer struct { | |||
| // HandleHTTP serves a series of encoded events via HTTP with Transfer-Encoding: chunked.
 | ||||
| // or over a websocket connection.
 | ||||
| func (s *WatchServer) HandleHTTP(w http.ResponseWriter, req *http.Request) { | ||||
| 	defer func() { | ||||
| 		if s.MemoryAllocator != nil { | ||||
| 			runtime.AllocatorPool.Put(s.MemoryAllocator) | ||||
| 		} | ||||
| 	}() | ||||
| 
 | ||||
| 	flusher, ok := w.(http.Flusher) | ||||
| 	if !ok { | ||||
| 		err := fmt.Errorf("unable to start watch - can't get http.Flusher: %#v", w) | ||||
|  | @ -266,8 +264,17 @@ func (s *WatchServer) HandleHTTP(w http.ResponseWriter, req *http.Request) { | |||
| 
 | ||||
| // HandleWS serves a series of encoded events over a websocket connection.
 | ||||
| func (s *WatchServer) HandleWS(ws *websocket.Conn) { | ||||
| 	defer func() { | ||||
| 		if s.MemoryAllocator != nil { | ||||
| 			runtime.AllocatorPool.Put(s.MemoryAllocator) | ||||
| 		} | ||||
| 	}() | ||||
| 
 | ||||
| 	defer ws.Close() | ||||
| 	done := make(chan struct{}) | ||||
| 	// ensure the connection times out
 | ||||
| 	timeoutCh, cleanup := s.TimeoutFactory.TimeoutCh() | ||||
| 	defer cleanup() | ||||
| 
 | ||||
| 	go func() { | ||||
| 		defer utilruntime.HandleCrash() | ||||
|  | @ -288,6 +295,8 @@ func (s *WatchServer) HandleWS(ws *websocket.Conn) { | |||
| 		select { | ||||
| 		case <-done: | ||||
| 			return | ||||
| 		case <-timeoutCh: | ||||
| 			return | ||||
| 		case event, ok := <-ch: | ||||
| 			if !ok { | ||||
| 				// End of results.
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue