Fix PipeWriter#CloseWithError race on go1.13

Kubernetes-commit: 399ab7aadca3879e040a5f2c4fde5451f838eb66
This commit is contained in:
Jordan Liggitt 2021-02-24 15:35:03 -05:00 committed by Kubernetes Publisher
parent 872a00fe90
commit 31a13e236e
1 changed files with 6 additions and 1 deletions

View File

@ -24,6 +24,7 @@ import (
"io"
"regexp"
"sync"
"sync/atomic"
"time"
"github.com/spf13/cobra"
@ -337,12 +338,14 @@ func (o LogsOptions) parallelConsumeRequest(requests map[corev1.ObjectReference]
reader, writer := io.Pipe()
wg := &sync.WaitGroup{}
wg.Add(len(requests))
closedWithError := int32(0)
for objRef, request := range requests {
go func(objRef corev1.ObjectReference, request rest.ResponseWrapper) {
defer wg.Done()
out := o.addPrefixIfNeeded(objRef, writer)
if err := o.ConsumeRequestFn(request, out); err != nil {
if !o.IgnoreLogErrors {
atomic.StoreInt32(&closedWithError, 1)
writer.CloseWithError(err)
// It's important to return here to propagate the error via the pipe
@ -357,7 +360,9 @@ func (o LogsOptions) parallelConsumeRequest(requests map[corev1.ObjectReference]
go func() {
wg.Wait()
writer.Close()
if atomic.LoadInt32(&closedWithError) == 0 {
writer.Close()
}
}()
_, err := io.Copy(o.Out, reader)