libgit2: improve subtransport logging

Debugging connection issues can be extremely difficult, even more so at scale or when
concurrent connections are required to trigger specific issues.

Changes:
- Add a correlation identifier for each reconciliation, which allows for greater traceability when
going through all the reconciliation operations - including at transport level.
- Add transportType to segregate HTTP and SSH transport logging.
- SSH operations are now enriched with addr containing server address, and HTTP url.

Signed-off-by: Paulo Gomes <paulo.gomes@weave.works>
This commit is contained in:
Paulo Gomes 2022-06-13 07:35:12 +01:00
parent 7a797e3b57
commit cea9ea9142
No known key found for this signature in database
GPG Key ID: 9995233870E99BEE
10 changed files with 99 additions and 47 deletions

View File

@ -29,6 +29,7 @@ import (
securejoin "github.com/cyphar/filepath-securejoin" securejoin "github.com/cyphar/filepath-securejoin"
"github.com/fluxcd/pkg/runtime/logger" "github.com/fluxcd/pkg/runtime/logger"
"github.com/google/uuid"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
@ -159,7 +160,13 @@ func (r *GitRepositoryReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, o
func (r *GitRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) { func (r *GitRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) {
start := time.Now() start := time.Now()
log := ctrl.LoggerFrom(ctx) log := ctrl.LoggerFrom(ctx).
// Sets a correlation ID for all transport level logs.
WithValues("cid", uuid.New())
// logger will be associated to the new context that is
// returned from ctrl.LoggerInto.
ctx = ctrl.LoggerInto(ctx, log)
// Fetch the GitRepository // Fetch the GitRepository
obj := &sourcev1.GitRepository{} obj := &sourcev1.GitRepository{}

View File

@ -40,7 +40,6 @@ import (
feathelper "github.com/fluxcd/pkg/runtime/features" feathelper "github.com/fluxcd/pkg/runtime/features"
"github.com/fluxcd/pkg/runtime/testenv" "github.com/fluxcd/pkg/runtime/testenv"
"github.com/fluxcd/pkg/testserver" "github.com/fluxcd/pkg/testserver"
"github.com/go-logr/logr"
"github.com/phayes/freeport" "github.com/phayes/freeport"
"github.com/distribution/distribution/v3/configuration" "github.com/distribution/distribution/v3/configuration"
@ -209,7 +208,7 @@ func TestMain(m *testing.M) {
fg := feathelper.FeatureGates{} fg := feathelper.FeatureGates{}
fg.SupportedFeatures(features.FeatureGates()) fg.SupportedFeatures(features.FeatureGates())
managed.InitManagedTransport(logr.Discard()) managed.InitManagedTransport()
if err := (&GitRepositoryReconciler{ if err := (&GitRepositoryReconciler{
Client: testEnv, Client: testEnv,

View File

@ -311,7 +311,7 @@ func main() {
}() }()
if enabled, _ := features.Enabled(features.GitManagedTransport); enabled { if enabled, _ := features.Enabled(features.GitManagedTransport); enabled {
managed.InitManagedTransport(ctrl.Log.WithName("managed-transport")) managed.InitManagedTransport()
} else { } else {
if optimize, _ := feathelper.Enabled(features.OptimizedGitClones); optimize { if optimize, _ := feathelper.Enabled(features.OptimizedGitClones); optimize {
features.Disable(features.OptimizedGitClones) features.Disable(features.OptimizedGitClones)

View File

@ -45,6 +45,7 @@ package managed
import ( import (
"bytes" "bytes"
"context"
"crypto/tls" "crypto/tls"
"crypto/x509" "crypto/x509"
"errors" "errors"
@ -55,9 +56,12 @@ import (
"strings" "strings"
"sync" "sync"
"github.com/fluxcd/pkg/runtime/logger"
pool "github.com/fluxcd/source-controller/internal/transport" pool "github.com/fluxcd/source-controller/internal/transport"
"github.com/fluxcd/source-controller/pkg/git" "github.com/fluxcd/source-controller/pkg/git"
"github.com/go-logr/logr"
git2go "github.com/libgit2/git2go/v33" git2go "github.com/libgit2/git2go/v33"
ctrl "sigs.k8s.io/controller-runtime"
) )
var actionSuffixes = []string{ var actionSuffixes = []string{
@ -81,10 +85,11 @@ func registerManagedHTTP() error {
} }
func httpSmartSubtransportFactory(remote *git2go.Remote, transport *git2go.Transport) (git2go.SmartSubtransport, error) { func httpSmartSubtransportFactory(remote *git2go.Remote, transport *git2go.Transport) (git2go.SmartSubtransport, error) {
traceLog.Info("[http]: httpSmartSubtransportFactory")
sst := &httpSmartSubtransport{ sst := &httpSmartSubtransport{
transport: transport, transport: transport,
httpTransport: pool.NewOrIdle(nil), httpTransport: pool.NewOrIdle(nil),
ctx: context.Background(),
logger: logr.Discard(),
} }
return sst, nil return sst, nil
@ -93,6 +98,21 @@ func httpSmartSubtransportFactory(remote *git2go.Remote, transport *git2go.Trans
type httpSmartSubtransport struct { type httpSmartSubtransport struct {
transport *git2go.Transport transport *git2go.Transport
httpTransport *http.Transport httpTransport *http.Transport
// once is used to ensure that logger and ctx is set only once,
// on the initial (or only) Action call. Without this a mutex must
// be applied to ensure that ctx won't be changed, as this would be
// prone to race conditions in the stdout processing goroutine.
once sync.Once
// ctx defines the context to be used across long-running or
// cancellable operations.
// Defaults to context.Background().
ctx context.Context
// logger keeps a Logger instance for logging. This was preferred
// due to the need to have a correlation ID and URL set and
// reused across all log calls.
// If context is not set, this defaults to logr.Discard().
logger logr.Logger
} }
func (t *httpSmartSubtransport) Action(transportOptionsURL string, action git2go.SmartServiceAction) (git2go.SmartSubtransportStream, error) { func (t *httpSmartSubtransport) Action(transportOptionsURL string, action git2go.SmartServiceAction) (git2go.SmartSubtransportStream, error) {
@ -133,6 +153,15 @@ func (t *httpSmartSubtransport) Action(transportOptionsURL string, action git2go
} }
t.httpTransport.DisableCompression = false t.httpTransport.DisableCompression = false
t.once.Do(func() {
if opts.Context != nil {
t.ctx = opts.Context
t.logger = ctrl.LoggerFrom(t.ctx,
"transportType", "http",
"url", opts.TargetURL)
}
})
client, req, err := createClientRequest(targetURL, action, t.httpTransport, opts.AuthOpts) client, req, err := createClientRequest(targetURL, action, t.httpTransport, opts.AuthOpts)
if err != nil { if err != nil {
return nil, err return nil, err
@ -176,8 +205,10 @@ func (t *httpSmartSubtransport) Action(transportOptionsURL string, action git2go
opts.TargetURL = trimActionSuffix(newURL.String()) opts.TargetURL = trimActionSuffix(newURL.String())
AddTransportOptions(transportOptionsURL, *opts) AddTransportOptions(transportOptionsURL, *opts)
debugLog.Info("[http]: server responded with redirect", // show as info, as this should be visible regardless of the
"newURL", opts.TargetURL, "StatusCode", req.Response.StatusCode) // chosen log-level.
t.logger.Info("server responded with redirect",
"newUrl", opts.TargetURL, "StatusCode", req.Response.StatusCode)
} }
} }
} }
@ -270,15 +301,16 @@ func createClientRequest(targetURL string, action git2go.SmartServiceAction,
} }
func (t *httpSmartSubtransport) Close() error { func (t *httpSmartSubtransport) Close() error {
traceLog.Info("[http]: httpSmartSubtransport.Close()") t.logger.V(logger.TraceLevel).Info("httpSmartSubtransport.Close()")
return nil return nil
} }
func (t *httpSmartSubtransport) Free() { func (t *httpSmartSubtransport) Free() {
traceLog.Info("[http]: httpSmartSubtransport.Free()") t.logger.V(logger.TraceLevel).Info("httpSmartSubtransport.Free()")
if t.httpTransport != nil { if t.httpTransport != nil {
traceLog.Info("[http]: release http transport back to pool") t.logger.V(logger.TraceLevel).Info("release http transport back to pool")
pool.Release(t.httpTransport) pool.Release(t.httpTransport)
t.httpTransport = nil t.httpTransport = nil
} }
@ -345,18 +377,18 @@ func (self *httpSmartSubtransportStream) Write(buf []byte) (int, error) {
func (self *httpSmartSubtransportStream) Free() { func (self *httpSmartSubtransportStream) Free() {
if self.resp != nil { if self.resp != nil {
traceLog.Info("[http]: httpSmartSubtransportStream.Free()") self.owner.logger.V(logger.TraceLevel).Info("httpSmartSubtransportStream.Free()")
if self.resp.Body != nil { if self.resp.Body != nil {
// ensure body is fully processed and closed // ensure body is fully processed and closed
// for increased likelihood of transport reuse in HTTP/1.x. // for increased likelihood of transport reuse in HTTP/1.x.
// it should not be a problem to do this more than once. // it should not be a problem to do this more than once.
if _, err := io.Copy(io.Discard, self.resp.Body); err != nil { if _, err := io.Copy(io.Discard, self.resp.Body); err != nil {
traceLog.Error(err, "[http]: cannot discard response body") self.owner.logger.V(logger.TraceLevel).Error(err, "cannot discard response body")
} }
if err := self.resp.Body.Close(); err != nil { if err := self.resp.Body.Close(); err != nil {
traceLog.Error(err, "[http]: cannot close response body") self.owner.logger.V(logger.TraceLevel).Error(err, "cannot close response body")
} }
} }
} }
@ -399,7 +431,7 @@ func (self *httpSmartSubtransportStream) sendRequest() error {
req.ContentLength = -1 req.ContentLength = -1
} }
traceLog.Info("[http]: new request", "method", req.Method, "URL", req.URL) self.owner.logger.V(logger.TraceLevel).Info("new request", "method", req.Method, "postUrl", req.URL)
resp, err = self.client.Do(req) resp, err = self.client.Do(req)
if err != nil { if err != nil {
return err return err

View File

@ -25,7 +25,6 @@ import (
"github.com/fluxcd/pkg/gittestserver" "github.com/fluxcd/pkg/gittestserver"
"github.com/fluxcd/source-controller/pkg/git" "github.com/fluxcd/source-controller/pkg/git"
"github.com/go-logr/logr"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
git2go "github.com/libgit2/git2go/v33" git2go "github.com/libgit2/git2go/v33"
@ -170,7 +169,7 @@ func TestHTTPManagedTransport_E2E(t *testing.T) {
defer server.StopHTTP() defer server.StopHTTP()
// Force managed transport to be enabled // Force managed transport to be enabled
InitManagedTransport(logr.Discard()) InitManagedTransport()
repoPath := "test.git" repoPath := "test.git"
err = server.InitRepo("../../testdata/git/repo", git.DefaultBranch, repoPath) err = server.InitRepo("../../testdata/git/repo", git.DefaultBranch, repoPath)
@ -253,7 +252,7 @@ func TestHTTPManagedTransport_HandleRedirect(t *testing.T) {
} }
// Force managed transport to be enabled // Force managed transport to be enabled
InitManagedTransport(logr.Discard()) InitManagedTransport()
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {

View File

@ -19,9 +19,6 @@ package managed
import ( import (
"sync" "sync"
"time" "time"
"github.com/fluxcd/pkg/runtime/logger"
"github.com/go-logr/logr"
) )
var ( var (
@ -38,9 +35,7 @@ var (
// handshake, put/get). // handshake, put/get).
fullHttpClientTimeOut time.Duration = 10 * time.Minute fullHttpClientTimeOut time.Duration = 10 * time.Minute
debugLog logr.Logger enabled bool
traceLog logr.Logger
enabled bool
) )
// Enabled defines whether the use of Managed Transport is enabled which // Enabled defines whether the use of Managed Transport is enabled which
@ -63,14 +58,10 @@ func Enabled() bool {
// //
// This function will only register managed transports once, subsequent calls // This function will only register managed transports once, subsequent calls
// leads to no-op. // leads to no-op.
func InitManagedTransport(log logr.Logger) error { func InitManagedTransport() error {
var err error var err error
once.Do(func() { once.Do(func() {
log.Info("Initializing managed transport")
debugLog = log.V(logger.DebugLevel)
traceLog = log.V(logger.TraceLevel)
if err = registerManagedHTTP(); err != nil { if err = registerManagedHTTP(); err != nil {
return return
} }

View File

@ -58,8 +58,11 @@ import (
"golang.org/x/crypto/ssh" "golang.org/x/crypto/ssh"
"golang.org/x/net/proxy" "golang.org/x/net/proxy"
ctrl "sigs.k8s.io/controller-runtime"
"github.com/fluxcd/pkg/runtime/logger"
"github.com/fluxcd/source-controller/pkg/git" "github.com/fluxcd/source-controller/pkg/git"
"github.com/go-logr/logr"
git2go "github.com/libgit2/git2go/v33" git2go "github.com/libgit2/git2go/v33"
) )
@ -79,17 +82,32 @@ func registerManagedSSH() error {
func sshSmartSubtransportFactory(remote *git2go.Remote, transport *git2go.Transport) (git2go.SmartSubtransport, error) { func sshSmartSubtransportFactory(remote *git2go.Remote, transport *git2go.Transport) (git2go.SmartSubtransport, error) {
return &sshSmartSubtransport{ return &sshSmartSubtransport{
transport: transport, transport: transport,
ctx: context.Background(),
logger: logr.Discard(),
}, nil }, nil
} }
type sshSmartSubtransport struct { type sshSmartSubtransport struct {
transport *git2go.Transport transport *git2go.Transport
// once is used to ensure that logger and ctx is set only once,
// on the initial (or only) Action call. Without this a mutex must
// be applied to ensure that ctx won't be changed, as this would be
// prone to race conditions in the stdout processing goroutine.
once sync.Once
// ctx defines the context to be used across long-running or
// cancellable operations.
// Defaults to context.Background().
ctx context.Context
// logger keeps a Logger instance for logging. This was preferred
// due to the need to have a correlation ID and Address set and
// reused across all log calls.
// If context is not set, this defaults to logr.Discard().
logger logr.Logger
lastAction git2go.SmartServiceAction lastAction git2go.SmartServiceAction
stdin io.WriteCloser stdin io.WriteCloser
stdout io.Reader stdout io.Reader
addr string
ctx context.Context
con connection con connection
} }
@ -111,8 +129,6 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
return nil, fmt.Errorf("could not find transport options for object: %s", transportOptionsURL) return nil, fmt.Errorf("could not find transport options for object: %s", transportOptionsURL)
} }
t.ctx = opts.Context
u, err := url.Parse(opts.TargetURL) u, err := url.Parse(opts.TargetURL)
if err != nil { if err != nil {
return nil, err return nil, err
@ -158,7 +174,16 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
if u.Port() != "" { if u.Port() != "" {
port = u.Port() port = u.Port()
} }
t.addr = net.JoinHostPort(u.Hostname(), port) addr := net.JoinHostPort(u.Hostname(), port)
t.once.Do(func() {
if opts.Context != nil {
t.ctx = opts.Context
t.logger = ctrl.LoggerFrom(t.ctx,
"transportType", "ssh",
"addr", addr)
}
})
sshConfig, err := createClientConfig(opts.AuthOpts) sshConfig, err := createClientConfig(opts.AuthOpts)
if err != nil { if err != nil {
@ -191,12 +216,12 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
} }
t.con.m.RUnlock() t.con.m.RUnlock()
err = t.createConn(t.addr, sshConfig) err = t.createConn(addr, sshConfig)
if err != nil { if err != nil {
return nil, err return nil, err
} }
traceLog.Info("[ssh]: creating new ssh session") t.logger.V(logger.TraceLevel).Info("creating new ssh session")
if t.con.session, err = t.con.client.NewSession(); err != nil { if t.con.session, err = t.con.client.NewSession(); err != nil {
return nil, err return nil, err
} }
@ -222,8 +247,8 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
// In case this goroutine panics, handle recovery. // In case this goroutine panics, handle recovery.
if r := recover(); r != nil { if r := recover(); r != nil {
traceLog.Error(errors.New(r.(string)), t.logger.V(logger.TraceLevel).Error(errors.New(r.(string)),
"[ssh]: recovered from libgit2 ssh smart subtransport panic", "address", t.addr) "recovered from libgit2 ssh smart subtransport panic")
} }
}() }()
@ -259,7 +284,7 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
} }
}() }()
traceLog.Info("[ssh]: run on remote", "cmd", cmd) t.logger.V(logger.TraceLevel).Info("run on remote", "cmd", cmd)
if err := t.con.session.Start(cmd); err != nil { if err := t.con.session.Start(cmd); err != nil {
return nil, err return nil, err
} }
@ -276,6 +301,7 @@ func (t *sshSmartSubtransport) createConn(addr string, sshConfig *ssh.ClientConf
ctx, cancel := context.WithTimeout(context.TODO(), sshConnectionTimeOut) ctx, cancel := context.WithTimeout(context.TODO(), sshConnectionTimeOut)
defer cancel() defer cancel()
t.logger.V(logger.TraceLevel).Info("dial connection")
conn, err := proxy.Dial(ctx, "tcp", addr) conn, err := proxy.Dial(ctx, "tcp", addr)
if err != nil { if err != nil {
return err return err
@ -303,9 +329,10 @@ func (t *sshSmartSubtransport) createConn(addr string, sshConfig *ssh.ClientConf
// may impair the transport to have successful actions on a new // may impair the transport to have successful actions on a new
// SmartSubTransport (i.e. unreleased resources, staled connections). // SmartSubTransport (i.e. unreleased resources, staled connections).
func (t *sshSmartSubtransport) Close() error { func (t *sshSmartSubtransport) Close() error {
traceLog.Info("[ssh]: sshSmartSubtransport.Close()", "server", t.addr) t.logger.V(logger.TraceLevel).Info("sshSmartSubtransport.Close()")
t.con.m.Lock() t.con.m.Lock()
defer t.con.m.Unlock() defer t.con.m.Unlock()
t.con.currentStream = nil t.con.currentStream = nil
if t.con.client != nil && t.stdin != nil { if t.con.client != nil && t.stdin != nil {
_ = t.stdin.Close() _ = t.stdin.Close()
@ -313,13 +340,14 @@ func (t *sshSmartSubtransport) Close() error {
t.stdin = nil t.stdin = nil
if t.con.session != nil { if t.con.session != nil {
traceLog.Info("[ssh]: session.Close()", "server", t.addr) t.logger.V(logger.TraceLevel).Info("session.Close()")
_ = t.con.session.Close() _ = t.con.session.Close()
} }
t.con.session = nil t.con.session = nil
if t.con.client != nil { if t.con.client != nil {
_ = t.con.client.Close() _ = t.con.client.Close()
t.logger.V(logger.TraceLevel).Info("close client")
} }
t.con.connected = false t.con.connected = false
@ -343,7 +371,6 @@ func (stream *sshSmartSubtransportStream) Write(buf []byte) (int, error) {
} }
func (stream *sshSmartSubtransportStream) Free() { func (stream *sshSmartSubtransportStream) Free() {
traceLog.Info("[ssh]: sshSmartSubtransportStream.Free()")
} }
func createClientConfig(authOpts *git.AuthOptions) (*ssh.ClientConfig, error) { func createClientConfig(authOpts *git.AuthOptions) (*ssh.ClientConfig, error) {

View File

@ -23,7 +23,6 @@ import (
"github.com/fluxcd/pkg/ssh" "github.com/fluxcd/pkg/ssh"
"github.com/fluxcd/source-controller/pkg/git" "github.com/fluxcd/source-controller/pkg/git"
"github.com/go-logr/logr"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/fluxcd/pkg/gittestserver" "github.com/fluxcd/pkg/gittestserver"
@ -89,7 +88,7 @@ func TestSSHManagedTransport_E2E(t *testing.T) {
server.StartSSH() server.StartSSH()
}() }()
defer server.StopSSH() defer server.StopSSH()
InitManagedTransport(logr.Discard()) InitManagedTransport()
kp, err := ssh.NewEd25519Generator().Generate() kp, err := ssh.NewEd25519Generator().Generate()
g.Expect(err).ToNot(HaveOccurred()) g.Expect(err).ToNot(HaveOccurred())

View File

@ -30,7 +30,6 @@ import (
"github.com/fluxcd/gitkit" "github.com/fluxcd/gitkit"
"github.com/fluxcd/pkg/gittestserver" "github.com/fluxcd/pkg/gittestserver"
"github.com/fluxcd/pkg/ssh" "github.com/fluxcd/pkg/ssh"
"github.com/go-logr/logr"
feathelper "github.com/fluxcd/pkg/runtime/features" feathelper "github.com/fluxcd/pkg/runtime/features"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
@ -471,5 +470,5 @@ func getTransportOptionsURL(transport git.TransportType) string {
func enableManagedTransport() { func enableManagedTransport() {
fg := feathelper.FeatureGates{} fg := feathelper.FeatureGates{}
fg.SupportedFeatures(features.FeatureGates()) fg.SupportedFeatures(features.FeatureGates())
managed.InitManagedTransport(logr.Discard()) managed.InitManagedTransport()
} }

View File

@ -30,7 +30,6 @@ import (
"github.com/elazarl/goproxy" "github.com/elazarl/goproxy"
"github.com/fluxcd/pkg/gittestserver" "github.com/fluxcd/pkg/gittestserver"
feathelper "github.com/fluxcd/pkg/runtime/features" feathelper "github.com/fluxcd/pkg/runtime/features"
"github.com/go-logr/logr"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/fluxcd/source-controller/internal/features" "github.com/fluxcd/source-controller/internal/features"
@ -50,7 +49,7 @@ func TestCheckoutStrategyForImplementation_Proxied(t *testing.T) {
fg := feathelper.FeatureGates{} fg := feathelper.FeatureGates{}
fg.SupportedFeatures(features.FeatureGates()) fg.SupportedFeatures(features.FeatureGates())
managed.InitManagedTransport(logr.Discard()) managed.InitManagedTransport()
type cleanupFunc func() type cleanupFunc func()