Wire context to logs command and add interrupt handler (#127503)

* Wire context to logs command and add interrupt handler

* Move conditional outside of interrupt handler

Kubernetes-commit: 5826868586d501060a6151d04c511236b11a26d4
This commit is contained in:
Arda Güçlü 2024-10-22 21:49:06 +03:00 committed by Kubernetes Publisher
parent 484ede079f
commit 4c8c153513
4 changed files with 29 additions and 21 deletions

View File

@ -945,12 +945,12 @@ func (o *DebugOptions) handleAttachPod(ctx context.Context, restClientGetter gen
}
if status.State.Terminated != nil {
klog.V(1).Info("Ephemeral container terminated, falling back to logs")
return logOpts(restClientGetter, pod, opts)
return logOpts(ctx, restClientGetter, pod, opts)
}
if err := opts.Run(); err != nil {
fmt.Fprintf(opts.ErrOut, "warning: couldn't attach to pod/%s, falling back to streaming logs: %v\n", podName, err)
return logOpts(restClientGetter, pod, opts)
return logOpts(ctx, restClientGetter, pod, opts)
}
return nil
}
@ -968,7 +968,7 @@ func getContainerStatusByName(pod *corev1.Pod, containerName string) *corev1.Con
}
// logOpts logs output from opts to the pods log.
func logOpts(restClientGetter genericclioptions.RESTClientGetter, pod *corev1.Pod, opts *attach.AttachOptions) error {
func logOpts(ctx context.Context, restClientGetter genericclioptions.RESTClientGetter, pod *corev1.Pod, opts *attach.AttachOptions) error {
ctrName, err := opts.GetContainerName(pod)
if err != nil {
return err
@ -979,7 +979,7 @@ func logOpts(restClientGetter genericclioptions.RESTClientGetter, pod *corev1.Po
return err
}
for _, request := range requests {
if err := logs.DefaultConsumeRequest(request, opts.Out); err != nil {
if err := logs.DefaultConsumeRequest(ctx, request, opts.Out); err != nil {
return err
}
}

View File

@ -41,6 +41,7 @@ import (
"k8s.io/kubectl/pkg/util"
"k8s.io/kubectl/pkg/util/completion"
"k8s.io/kubectl/pkg/util/i18n"
"k8s.io/kubectl/pkg/util/interrupt"
"k8s.io/kubectl/pkg/util/templates"
)
@ -124,7 +125,7 @@ type LogsOptions struct {
Options runtime.Object
Resources []string
ConsumeRequestFn func(rest.ResponseWrapper, io.Writer) error
ConsumeRequestFn func(context.Context, rest.ResponseWrapper, io.Writer) error
// PodLogOptions
SinceTime string
@ -375,14 +376,21 @@ func (o LogsOptions) RunLogs() error {
len(requests), o.MaxFollowConcurrency,
)
}
return o.parallelConsumeRequest(requests)
}
return o.sequentialConsumeRequest(requests)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
intr := interrupt.New(nil, cancel)
return intr.Run(func() error {
if o.Follow && len(requests) > 1 {
return o.parallelConsumeRequest(ctx, requests)
}
return o.sequentialConsumeRequest(ctx, requests)
})
}
func (o LogsOptions) parallelConsumeRequest(requests map[corev1.ObjectReference]rest.ResponseWrapper) error {
func (o LogsOptions) parallelConsumeRequest(ctx context.Context, requests map[corev1.ObjectReference]rest.ResponseWrapper) error {
reader, writer := io.Pipe()
wg := &sync.WaitGroup{}
wg.Add(len(requests))
@ -390,7 +398,7 @@ func (o LogsOptions) parallelConsumeRequest(requests map[corev1.ObjectReference]
go func(objRef corev1.ObjectReference, request rest.ResponseWrapper) {
defer wg.Done()
out := o.addPrefixIfNeeded(objRef, writer)
if err := o.ConsumeRequestFn(request, out); err != nil {
if err := o.ConsumeRequestFn(ctx, request, out); err != nil {
if !o.IgnoreLogErrors {
writer.CloseWithError(err)
@ -413,10 +421,10 @@ func (o LogsOptions) parallelConsumeRequest(requests map[corev1.ObjectReference]
return err
}
func (o LogsOptions) sequentialConsumeRequest(requests map[corev1.ObjectReference]rest.ResponseWrapper) error {
func (o LogsOptions) sequentialConsumeRequest(ctx context.Context, requests map[corev1.ObjectReference]rest.ResponseWrapper) error {
for objRef, request := range requests {
out := o.addPrefixIfNeeded(objRef, o.Out)
if err := o.ConsumeRequestFn(request, out); err != nil {
if err := o.ConsumeRequestFn(ctx, request, out); err != nil {
if !o.IgnoreLogErrors {
return err
}
@ -457,8 +465,8 @@ func (o LogsOptions) addPrefixIfNeeded(ref corev1.ObjectReference, writer io.Wri
// A successful read returns err == nil, not err == io.EOF.
// Because the function is defined to read from request until io.EOF, it does
// not treat an io.EOF as an error to be reported.
func DefaultConsumeRequest(request rest.ResponseWrapper, out io.Writer) error {
readCloser, err := request.Stream(context.TODO())
func DefaultConsumeRequest(ctx context.Context, request rest.ResponseWrapper, out io.Writer) error {
readCloser, err := request.Stream(ctx)
if err != nil {
return err
}

View File

@ -304,7 +304,7 @@ func TestLog(t *testing.T) {
o := NewLogsOptions(streams)
o.LogsForObject = mock.mockLogsForObject
o.ConsumeRequestFn = func(req restclient.ResponseWrapper, out io.Writer) error {
o.ConsumeRequestFn = func(ctx context.Context, req restclient.ResponseWrapper, out io.Writer) error {
return errors.New("Error from the ConsumeRequestFn")
}
return o
@ -378,7 +378,7 @@ func TestLog(t *testing.T) {
o := NewLogsOptions(streams)
o.LogsForObject = mock.mockLogsForObject
o.ConsumeRequestFn = func(req restclient.ResponseWrapper, out io.Writer) error {
o.ConsumeRequestFn = func(ctx context.Context, req restclient.ResponseWrapper, out io.Writer) error {
return errors.New("Error from the ConsumeRequestFn")
}
o.Follow = true
@ -401,7 +401,7 @@ func TestLog(t *testing.T) {
o := NewLogsOptions(streams)
o.LogsForObject = mock.mockLogsForObject
o.ConsumeRequestFn = func(req restclient.ResponseWrapper, out io.Writer) error {
o.ConsumeRequestFn = func(ctx context.Context, req restclient.ResponseWrapper, out io.Writer) error {
return errors.New("Error from the ConsumeRequestFn")
}
o.Follow = true
@ -808,7 +808,7 @@ func TestDefaultConsumeRequest(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
buf := &bytes.Buffer{}
err := DefaultConsumeRequest(test.request, buf)
err := DefaultConsumeRequest(context.TODO(), test.request, buf)
if err != nil && !strings.Contains(err.Error(), test.expectedErr) {
t.Errorf("%s: expected to find:\n\t%s\nfound:\n\t%s\n", test.name, test.expectedErr, err.Error())
@ -932,8 +932,8 @@ type logTestMock struct {
wg *sync.WaitGroup
}
func (l *logTestMock) mockConsumeRequest(request restclient.ResponseWrapper, out io.Writer) error {
readCloser, err := request.Stream(context.Background())
func (l *logTestMock) mockConsumeRequest(ctx context.Context, request restclient.ResponseWrapper, out io.Writer) error {
readCloser, err := request.Stream(ctx)
if err != nil {
return err
}

View File

@ -504,7 +504,7 @@ func logOpts(restClientGetter genericclioptions.RESTClientGetter, pod *corev1.Po
return err
}
for _, request := range requests {
if err := logs.DefaultConsumeRequest(request, opts.Out); err != nil {
if err := logs.DefaultConsumeRequest(context.Background(), request, opts.Out); err != nil {
return err
}
}