Reuse HTTP connections in managed transport

Ensure all requests are completely processed and closed,
to prove odds of the underlying connections to be reused.

The transport now is pooled and reused whenever possible.

Signed-off-by: Paulo Gomes <paulo.gomes@weave.works>
This commit is contained in:
Paulo Gomes 2022-03-24 20:35:12 +00:00
parent 3819ac37bc
commit a860ebee04
No known key found for this signature in database
GPG Key ID: 9995233870E99BEE
1 changed files with 26 additions and 22 deletions

View File

@ -50,12 +50,11 @@ import (
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"sync"
"time"
"github.com/fluxcd/source-controller/internal/transport"
git2go "github.com/libgit2/git2go/v33"
)
@ -81,7 +80,8 @@ func httpSmartSubtransportFactory(remote *git2go.Remote, transport *git2go.Trans
}
type httpSmartSubtransport struct {
transport *git2go.Transport
transport *git2go.Transport
httpTransport *http.Transport
}
func (t *httpSmartSubtransport) Action(targetUrl string, action git2go.SmartServiceAction) (git2go.SmartSubtransportStream, error) {
@ -104,25 +104,11 @@ func (t *httpSmartSubtransport) Action(targetUrl string, action git2go.SmartServ
proxyFn = http.ProxyURL(parsedUrl)
}
httpTransport := &http.Transport{
// Add the proxy to the http transport.
Proxy: proxyFn,
// reuses the http transport from a pool, or create new one on demand.
t.httpTransport = transport.NewOrIdle(nil)
t.httpTransport.Proxy = proxyFn
// Set reasonable timeouts to ensure connections are not
// left open in an idle state, nor they hang indefinitely.
//
// These are based on the official go http.DefaultTransport:
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
client, req, err := createClientRequest(targetUrl, action, httpTransport)
client, req, err := createClientRequest(targetUrl, action, t.httpTransport)
if err != nil {
return nil, err
}
@ -291,6 +277,10 @@ func (self *httpSmartSubtransportStream) Write(buf []byte) (int, error) {
func (self *httpSmartSubtransportStream) Free() {
if self.resp != nil {
// ensure body is fully processed and closed
// for increased likelihood of transport reuse in HTTP/1.x.
// it should not be a problem to do this more than once.
io.Copy(io.Discard, self.resp.Body)
self.resp.Body.Close()
}
}
@ -362,6 +352,11 @@ func (self *httpSmartSubtransportStream) sendRequest() error {
// GET requests will be automatically redirected.
// POST require the new destination, and also the body content.
if req.Method == "POST" && resp.StatusCode >= 301 && resp.StatusCode <= 308 {
// ensure body is fully processed and closed
// for increased likelihood of transport reuse in HTTP/1.x.
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
// The next try will go against the new destination
self.req.URL, err = resp.Location()
if err != nil {
@ -371,15 +366,24 @@ func (self *httpSmartSubtransportStream) sendRequest() error {
continue
}
// for HTTP 200, the response will be cleared up by Free()
if resp.StatusCode == http.StatusOK {
break
}
// ensure body is fully processed and closed
// for increased likelihood of transport reuse in HTTP/1.x.
io.Copy(io.Discard, resp.Body)
defer resp.Body.Close()
resp.Body.Close()
return fmt.Errorf("Unhandled HTTP error %s", resp.Status)
}
if self.owner.httpTransport != nil {
transport.Release(self.owner.httpTransport)
self.owner.httpTransport = nil
}
self.resp = resp
self.sentRequest = true
return nil