Fix race condition on httpSmartSubTransport

Signed-off-by: Paulo Gomes <paulo.gomes@weave.works>
This commit is contained in:
Paulo Gomes 2022-03-15 14:42:54 +00:00 committed by Sunny
parent 822788b79e
commit d1a7e5d609
No known key found for this signature in database
GPG Key ID: 9F3D25DDFF7FA3CF
1 changed files with 33 additions and 28 deletions

View File

@ -217,6 +217,7 @@ type httpSmartSubtransportStream struct {
sentRequest bool sentRequest bool
recvReply sync.WaitGroup recvReply sync.WaitGroup
httpError error httpError error
m sync.RWMutex
} }
func newManagedHttpStream(owner *httpSmartSubtransport, req *http.Request, client *http.Client) *httpSmartSubtransportStream { func newManagedHttpStream(owner *httpSmartSubtransport, req *http.Request, client *http.Client) *httpSmartSubtransportStream {
@ -244,6 +245,8 @@ func (self *httpSmartSubtransportStream) Read(buf []byte) (int, error) {
self.recvReply.Wait() self.recvReply.Wait()
self.m.RLock()
defer self.m.RUnlock()
if self.httpError != nil { if self.httpError != nil {
return 0, self.httpError return 0, self.httpError
} }
@ -252,6 +255,8 @@ func (self *httpSmartSubtransportStream) Read(buf []byte) (int, error) {
} }
func (self *httpSmartSubtransportStream) Write(buf []byte) (int, error) { func (self *httpSmartSubtransportStream) Write(buf []byte) (int, error) {
self.m.RLock()
defer self.m.RUnlock()
if self.httpError != nil { if self.httpError != nil {
return 0, self.httpError return 0, self.httpError
} }
@ -266,7 +271,11 @@ func (self *httpSmartSubtransportStream) Free() {
func (self *httpSmartSubtransportStream) sendRequestBackground() { func (self *httpSmartSubtransportStream) sendRequestBackground() {
go func() { go func() {
self.httpError = self.sendRequest() err := self.sendRequest()
self.m.Lock()
self.httpError = err
self.m.Unlock()
}() }()
self.sentRequest = true self.sentRequest = true
} }
@ -299,33 +308,29 @@ func (self *httpSmartSubtransportStream) sendRequest() error {
} }
} }
for { req := &http.Request{
req := &http.Request{ Method: self.req.Method,
Method: self.req.Method, URL: self.req.URL,
URL: self.req.URL, Header: self.req.Header,
Header: self.req.Header, }
} if req.Method == "POST" {
if req.Method == "POST" { req.Body = self.reader
req.Body = self.reader req.ContentLength = -1
req.ContentLength = -1
}
req.SetBasicAuth(userName, password)
resp, err = self.client.Do(req)
if err != nil {
return err
}
if resp.StatusCode == http.StatusOK {
break
}
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
return fmt.Errorf("Unhandled HTTP error %s", resp.Status)
} }
self.sentRequest = true req.SetBasicAuth(userName, password)
self.resp = resp resp, err = self.client.Do(req)
return nil if err != nil {
return err
}
if resp.StatusCode == http.StatusOK {
self.resp = resp
self.sentRequest = true
return nil
}
io.Copy(ioutil.Discard, resp.Body)
defer resp.Body.Close()
return fmt.Errorf("Unhandled HTTP error %s", resp.Status)
} }