Merge pull request #131170 from azych/fix_goroutine_leak

Fix goroutine leak

Kubernetes-commit: 35b4016d222936e6acd71239d9f3ab128c05baa1
This commit is contained in:
Kubernetes Publisher 2025-05-04 15:32:02 -07:00
commit 8419bc34b9
4 changed files with 94 additions and 3 deletions

1
go.mod
View File

@ -10,6 +10,7 @@ require (
github.com/emicklei/go-restful/v3 v3.11.0
github.com/gogo/protobuf v1.3.2
github.com/stretchr/testify v1.10.0
go.uber.org/goleak v1.3.0
google.golang.org/grpc v1.68.1
k8s.io/api v0.0.0-20250503031400-f7e72be095ee
k8s.io/apimachinery v0.0.0-20250503031111-512f488de379

2
go.sum
View File

@ -98,6 +98,8 @@ go.opentelemetry.io/otel v1.33.0 h1:/FerN9bax5LoK51X/sI0SVYrjSE0/yUL7DpxW4K3FWw=
go.opentelemetry.io/otel v1.33.0/go.mod h1:SUUkR6csvUQl+yjReHu5uM3EtVV7MBm5FHKRlNx4I8I=
go.opentelemetry.io/otel/trace v1.33.0 h1:cCJuF7LRjUFso9LPnEAHJDB2pqzp+hbO8eu1qqW2d/s=
go.opentelemetry.io/otel/trace v1.33.0/go.mod h1:uIcdVUZMpTAmz0tI1z04GoVSezK37CbGV4fr1f2nBck=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=

View File

@ -17,6 +17,7 @@ limitations under the License.
package remotecommand
import (
"context"
"encoding/json"
"errors"
"fmt"
@ -116,7 +117,7 @@ func createStreams(req *http.Request, w http.ResponseWriter, opts *Options, supp
if ctx.resizeStream != nil {
ctx.resizeChan = make(chan remotecommand.TerminalSize)
go handleResizeEvents(ctx.resizeStream, ctx.resizeChan)
go handleResizeEvents(req.Context(), ctx.resizeStream, ctx.resizeChan)
}
return ctx, true
@ -409,7 +410,7 @@ WaitForStreams:
// supportsTerminalResizing returns false because v1ProtocolHandler doesn't support it.
func (*v1ProtocolHandler) supportsTerminalResizing() bool { return false }
func handleResizeEvents(stream io.Reader, channel chan<- remotecommand.TerminalSize) {
func handleResizeEvents(reqctx context.Context, stream io.Reader, channel chan<- remotecommand.TerminalSize) {
defer runtime.HandleCrash()
defer close(channel)
@ -419,7 +420,12 @@ func handleResizeEvents(stream io.Reader, channel chan<- remotecommand.TerminalS
if err := decoder.Decode(&size); err != nil {
break
}
channel <- size
select {
case channel <- size:
case <-reqctx.Done():
// To prevent go routine leak.
return
}
}
}

View File

@ -0,0 +1,82 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remotecommand
import (
"bytes"
"context"
"encoding/json"
"io"
"testing"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"k8s.io/client-go/tools/remotecommand"
)
func TestHandleResizeEvents(t *testing.T) {
var testTerminalSize remotecommand.TerminalSize
rawTerminalSize, err := json.Marshal(&testTerminalSize)
require.NoError(t, err)
testCases := []struct {
name string
resizeStreamData []byte
cancelContext bool
readFromChannel bool
}{
{
name: "data attempted to be sent on the channel; channel not read; context canceled",
resizeStreamData: rawTerminalSize,
cancelContext: true,
},
{
name: "data attempted to be sent on the channel; channel read; context not canceled",
resizeStreamData: rawTerminalSize,
readFromChannel: true,
},
{
name: "no data attempted to be sent on the channel; context canceled",
cancelContext: true,
},
{
name: "no data attempted to be sent on the channel; context not canceled",
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
connCtx := connectionContext{
resizeStream: io.NopCloser(bytes.NewReader(testCase.resizeStreamData)),
resizeChan: make(chan remotecommand.TerminalSize),
}
go handleResizeEvents(ctx, connCtx.resizeStream, connCtx.resizeChan)
if testCase.readFromChannel {
gotTerminalSize := <-connCtx.resizeChan
require.Equal(t, gotTerminalSize, testTerminalSize)
}
if testCase.cancelContext {
cancel()
}
goleak.VerifyNone(t)
cancel()
})
}
}