diff --git a/go.mod b/go.mod index 546216a..fdc6ee2 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/emicklei/go-restful/v3 v3.11.0 github.com/gogo/protobuf v1.3.2 github.com/stretchr/testify v1.10.0 + go.uber.org/goleak v1.3.0 google.golang.org/grpc v1.68.1 k8s.io/api v0.0.0-20250503031400-f7e72be095ee k8s.io/apimachinery v0.0.0-20250503031111-512f488de379 diff --git a/go.sum b/go.sum index 2749158..bae2c34 100644 --- a/go.sum +++ b/go.sum @@ -98,6 +98,8 @@ go.opentelemetry.io/otel v1.33.0 h1:/FerN9bax5LoK51X/sI0SVYrjSE0/yUL7DpxW4K3FWw= go.opentelemetry.io/otel v1.33.0/go.mod h1:SUUkR6csvUQl+yjReHu5uM3EtVV7MBm5FHKRlNx4I8I= go.opentelemetry.io/otel/trace v1.33.0 h1:cCJuF7LRjUFso9LPnEAHJDB2pqzp+hbO8eu1qqW2d/s= go.opentelemetry.io/otel/trace v1.33.0/go.mod h1:uIcdVUZMpTAmz0tI1z04GoVSezK37CbGV4fr1f2nBck= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/pkg/cri/streaming/remotecommand/httpstream.go b/pkg/cri/streaming/remotecommand/httpstream.go index 92ab045..62e301a 100644 --- a/pkg/cri/streaming/remotecommand/httpstream.go +++ b/pkg/cri/streaming/remotecommand/httpstream.go @@ -17,6 +17,7 @@ limitations under the License. package remotecommand import ( + "context" "encoding/json" "errors" "fmt" @@ -116,7 +117,7 @@ func createStreams(req *http.Request, w http.ResponseWriter, opts *Options, supp if ctx.resizeStream != nil { ctx.resizeChan = make(chan remotecommand.TerminalSize) - go handleResizeEvents(ctx.resizeStream, ctx.resizeChan) + go handleResizeEvents(req.Context(), ctx.resizeStream, ctx.resizeChan) } return ctx, true @@ -409,7 +410,7 @@ WaitForStreams: // supportsTerminalResizing returns false because v1ProtocolHandler doesn't support it. func (*v1ProtocolHandler) supportsTerminalResizing() bool { return false } -func handleResizeEvents(stream io.Reader, channel chan<- remotecommand.TerminalSize) { +func handleResizeEvents(reqctx context.Context, stream io.Reader, channel chan<- remotecommand.TerminalSize) { defer runtime.HandleCrash() defer close(channel) @@ -419,7 +420,12 @@ func handleResizeEvents(stream io.Reader, channel chan<- remotecommand.TerminalS if err := decoder.Decode(&size); err != nil { break } - channel <- size + select { + case channel <- size: + case <-reqctx.Done(): + // To prevent go routine leak. + return + } } } diff --git a/pkg/cri/streaming/remotecommand/httpstream_test.go b/pkg/cri/streaming/remotecommand/httpstream_test.go new file mode 100644 index 0000000..c2781f6 --- /dev/null +++ b/pkg/cri/streaming/remotecommand/httpstream_test.go @@ -0,0 +1,82 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package remotecommand + +import ( + "bytes" + "context" + "encoding/json" + "io" + "testing" + + "github.com/stretchr/testify/require" + "go.uber.org/goleak" + + "k8s.io/client-go/tools/remotecommand" +) + +func TestHandleResizeEvents(t *testing.T) { + var testTerminalSize remotecommand.TerminalSize + rawTerminalSize, err := json.Marshal(&testTerminalSize) + require.NoError(t, err) + + testCases := []struct { + name string + resizeStreamData []byte + cancelContext bool + readFromChannel bool + }{ + { + name: "data attempted to be sent on the channel; channel not read; context canceled", + resizeStreamData: rawTerminalSize, + cancelContext: true, + }, + { + name: "data attempted to be sent on the channel; channel read; context not canceled", + resizeStreamData: rawTerminalSize, + readFromChannel: true, + }, + { + name: "no data attempted to be sent on the channel; context canceled", + cancelContext: true, + }, + { + name: "no data attempted to be sent on the channel; context not canceled", + }, + } + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + connCtx := connectionContext{ + resizeStream: io.NopCloser(bytes.NewReader(testCase.resizeStreamData)), + resizeChan: make(chan remotecommand.TerminalSize), + } + + go handleResizeEvents(ctx, connCtx.resizeStream, connCtx.resizeChan) + if testCase.readFromChannel { + gotTerminalSize := <-connCtx.resizeChan + require.Equal(t, gotTerminalSize, testTerminalSize) + } + if testCase.cancelContext { + cancel() + } + + goleak.VerifyNone(t) + cancel() + }) + } +}