From 69c3f00172fab8b35622953824ee3fa4ee2fc3bd Mon Sep 17 00:00:00 2001 From: Paulo Gomes Date: Wed, 6 Apr 2022 14:18:48 +0100 Subject: [PATCH 1/7] libgit2: retry on stale connections SSH servers that block the reuse of SSH connections for multiple SSH sessions may lead to EOF when a new session is being created. This fixes the issue of long-running connections resulting in EOF for GitLab servers. Signed-off-by: Paulo Gomes --- pkg/git/libgit2/managed/ssh.go | 86 +++++++++++++++++++++++----------- 1 file changed, 58 insertions(+), 28 deletions(-) diff --git a/pkg/git/libgit2/managed/ssh.go b/pkg/git/libgit2/managed/ssh.go index 31dd6cdf..367d8bf2 100644 --- a/pkg/git/libgit2/managed/ssh.go +++ b/pkg/git/libgit2/managed/ssh.go @@ -193,51 +193,46 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi return t.transport.SmartCertificateCheck(cert, true, hostname) } + var cacheHit bool aMux.RLock() if c, ok := sshClients[ckey]; ok { traceLog.Info("[ssh]: cache hit", "remoteAddress", addr) t.client = c + cacheHit = true } aMux.RUnlock() if t.client == nil { + cacheHit = false traceLog.Info("[ssh]: cache miss", "remoteAddress", addr) - - aMux.Lock() - defer aMux.Unlock() - - // In some scenarios the ssh handshake can hang indefinitely at - // golang.org/x/crypto/ssh.(*handshakeTransport).kexLoop. - // - // xref: https://github.com/golang/go/issues/51926 - done := make(chan error, 1) - go func() { - t.client, err = ssh.Dial("tcp", addr, sshConfig) - done <- err - }() - - dialTimeout := sshConfig.Timeout + (30 * time.Second) - - select { - case doneErr := <-done: - if doneErr != nil { - err = fmt.Errorf("ssh.Dial: %w", doneErr) - } - case <-time.After(dialTimeout): - err = fmt.Errorf("timed out waiting for ssh.Dial after %s", dialTimeout) - } - + err := t.createConn(ckey, addr, sshConfig) if err != nil { return nil, err } - - sshClients[ckey] = t.client } traceLog.Info("[ssh]: creating new ssh session") if t.session, err = t.client.NewSession(); err != nil { discardCachedSshClient(ckey) - return nil, err + + // if the current connection was cached, and the error is EOF, + // we can try again as this may be a stale connection. + if !(cacheHit && err.Error() == "EOF") { + return nil, err + } + + traceLog.Info("[ssh]: cached connection was stale, retrying...") + err = t.createConn(ckey, addr, sshConfig) + if err != nil { + return nil, err + } + + traceLog.Info("[ssh]: creating new ssh session with new connection") + t.session, err = t.client.NewSession() + if err != nil { + discardCachedSshClient(ckey) + return nil, err + } } if t.stdin, err = t.session.StdinPipe(); err != nil { @@ -264,6 +259,41 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi return t.currentStream, nil } +func (t *sshSmartSubtransport) createConn(ckey, addr string, sshConfig *ssh.ClientConfig) error { + aMux.Lock() + defer aMux.Unlock() + + // In some scenarios the ssh handshake can hang indefinitely at + // golang.org/x/crypto/ssh.(*handshakeTransport).kexLoop. + // + // xref: https://github.com/golang/go/issues/51926 + done := make(chan error, 1) + var err error + + go func() { + t.client, err = ssh.Dial("tcp", addr, sshConfig) + done <- err + }() + + dialTimeout := sshConfig.Timeout + (30 * time.Second) + + select { + case doneErr := <-done: + if doneErr != nil { + err = fmt.Errorf("ssh.Dial: %w", doneErr) + } + case <-time.After(dialTimeout): + err = fmt.Errorf("timed out waiting for ssh.Dial after %s", dialTimeout) + } + + if err != nil { + return err + } + + sshClients[ckey] = t.client + return nil +} + func (t *sshSmartSubtransport) Close() error { var returnErr error From 54d0794d1917fe693eca7f4f9b8f01092db01043 Mon Sep 17 00:00:00 2001 From: Paulo Gomes Date: Thu, 7 Apr 2022 04:07:19 +0100 Subject: [PATCH 2/7] libgit2: handle the closing of stale connections Internal and upstream calls to sshSmartSubtransport.Close() when dealing with an stale connection, may lead to misleading errors. Focus should instead be redirected to ensuring that Close() releases resources and ensures that a new SubTransport can be created, so new operations can succeed. Signed-off-by: Paulo Gomes --- pkg/git/libgit2/managed/ssh.go | 37 ++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/pkg/git/libgit2/managed/ssh.go b/pkg/git/libgit2/managed/ssh.go index 367d8bf2..4c1f781d 100644 --- a/pkg/git/libgit2/managed/ssh.go +++ b/pkg/git/libgit2/managed/ssh.go @@ -101,6 +101,7 @@ type sshSmartSubtransport struct { stdin io.WriteCloser stdout io.Reader currentStream *sshSmartSubtransportStream + ckey string } // aMux is the read-write mutex to control access to sshClients. @@ -138,9 +139,8 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi if t.lastAction == git2go.SmartServiceActionUploadpackLs { return t.currentStream, nil } - if err := t.Close(); err != nil { - traceLog.Error(err, "[ssh]: error cleaning up previous stream") - } + // Disregard errors from previous stream, futher details inside Close(). + _ = t.Close() } cmd = fmt.Sprintf("git-upload-pack '%s'", uPath) @@ -149,9 +149,8 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi if t.lastAction == git2go.SmartServiceActionReceivepackLs { return t.currentStream, nil } - if err := t.Close(); err != nil { - traceLog.Error(err, "[ssh]: error cleaning up previous stream") - } + // Disregard errors from previous stream, futher details inside Close(). + _ = t.Close() } cmd = fmt.Sprintf("git-receive-pack '%s'", uPath) @@ -176,6 +175,8 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi if err != nil { return nil, err } + t.ckey = ckey + sshConfig.HostKeyCallback = func(hostname string, remote net.Addr, key ssh.PublicKey) error { marshaledKey := key.Marshal() cert := &git2go.Certificate{ @@ -294,28 +295,34 @@ func (t *sshSmartSubtransport) createConn(ckey, addr string, sshConfig *ssh.Clie return nil } +// Close closes the smart subtransport. +// +// This is called internally ahead of a new action, and also +// upstream by the transport handler: +// https://github.com/libgit2/git2go/blob/0e8009f00a65034d196c67b1cdd82af6f12c34d3/transport.go#L409 +// +// Avoid returning errors, but focus on releasing anything that +// may impair the transport to have successful actions on a new +// SmartSubTransport (i.e. unreleased resources, staled connections). func (t *sshSmartSubtransport) Close() error { - var returnErr error - traceLog.Info("[ssh]: sshSmartSubtransport.Close()") t.currentStream = nil if t.client != nil && t.stdin != nil { - if err := t.stdin.Close(); err != nil { - returnErr = fmt.Errorf("cannot close stdin: %w", err) - } + _ = t.stdin.Close() } t.client = nil if t.session != nil { - traceLog.Info("[ssh]: skipping session.wait") traceLog.Info("[ssh]: session.Close()") - if err := t.session.Close(); err != nil { - returnErr = fmt.Errorf("cannot close session: %w", err) + err := t.session.Close() + // failure closing a session suggests a stale connection. + if err != nil && t.ckey != "" { + discardCachedSshClient(t.ckey) } } t.session = nil - return returnErr + return nil } func (t *sshSmartSubtransport) Free() { From d86f0a280a702b2944df6b534219cb9ada3a01a4 Mon Sep 17 00:00:00 2001 From: Paulo Gomes Date: Thu, 7 Apr 2022 05:42:15 +0100 Subject: [PATCH 3/7] libgit2: validate URL max length The major Git SaaS providers have repository URLs for both HTTP and SSH that tops around 250 characters in length. The limits chosen were a lot higher to align with use cases in which users may have on-premise servers with long domain names and paths. For SSH the validation is around path length only, which is now limited to 4096 characters, which is at the higher end of the range in Linux. For HTTP the validation is around the full URL provided by the caller. Signed-off-by: Paulo Gomes --- pkg/git/libgit2/managed/http.go | 4 ++++ pkg/git/libgit2/managed/ssh.go | 17 ++++++++++++----- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/pkg/git/libgit2/managed/http.go b/pkg/git/libgit2/managed/http.go index 04e1c54b..8a58dbb8 100644 --- a/pkg/git/libgit2/managed/http.go +++ b/pkg/git/libgit2/managed/http.go @@ -171,6 +171,10 @@ func createClientRequest(targetUrl string, action git2go.SmartServiceAction, t * } } + if len(finalUrl) > 2048 { + return nil, nil, fmt.Errorf("URL exceeds the max length (2048)") + } + client := &http.Client{ Transport: t, Timeout: fullHttpClientTimeOut, diff --git a/pkg/git/libgit2/managed/ssh.go b/pkg/git/libgit2/managed/ssh.go index 4c1f781d..d6272e85 100644 --- a/pkg/git/libgit2/managed/ssh.go +++ b/pkg/git/libgit2/managed/ssh.go @@ -125,12 +125,19 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi return nil, err } - // Escape \ and '. - uPath := strings.Replace(u.Path, `\`, `\\`, -1) - uPath = strings.Replace(uPath, `'`, `\'`, -1) + if len(u.Path) > 4096 { + return nil, fmt.Errorf("path exceeds the max length (4096)") + } - // TODO: Add percentage decode similar to libgit2. - // Refer: https://github.com/libgit2/libgit2/blob/358a60e1b46000ea99ef10b4dd709e92f75ff74b/src/str.c#L455-L481 + // decode URI's path + uPath, err := url.PathUnescape(u.Path) + if err != nil { + return nil, err + } + + // Escape \ and '. + uPath = strings.Replace(uPath, `\`, `\\`, -1) + uPath = strings.Replace(uPath, `'`, `\'`, -1) var cmd string switch action { From add07745f3a95f7b844ddbe049413f63f210c22b Mon Sep 17 00:00:00 2001 From: Paulo Gomes Date: Thu, 7 Apr 2022 05:49:00 +0100 Subject: [PATCH 4/7] libgit2: restrict smart creds to Type SSH Memory Avoid asking for SSH credential in files, as they won't be used. The cacheKeyAndConfig func already enforces this behaviour. Signed-off-by: Paulo Gomes --- pkg/git/libgit2/managed/ssh.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/git/libgit2/managed/ssh.go b/pkg/git/libgit2/managed/ssh.go index d6272e85..a4a732ad 100644 --- a/pkg/git/libgit2/managed/ssh.go +++ b/pkg/git/libgit2/managed/ssh.go @@ -165,7 +165,7 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi return nil, fmt.Errorf("unexpected action: %v", action) } - cred, err := t.transport.SmartCredentials("", git2go.CredentialTypeSSHKey|git2go.CredentialTypeSSHMemory) + cred, err := t.transport.SmartCredentials("", git2go.CredentialTypeSSHMemory) if err != nil { return nil, err } From 3ab95a4bf0fe43bcdac756489753ab937c1dfdd1 Mon Sep 17 00:00:00 2001 From: Paulo Gomes Date: Thu, 7 Apr 2022 06:13:18 +0100 Subject: [PATCH 5/7] libgit2: close discarded connections Cached connections can be shared across concurrent operations, and their disposal must take that into account to avoid closing a connection that is stale for one goroutine, but is still valid for another. Signed-off-by: Paulo Gomes --- pkg/git/libgit2/managed/ssh.go | 65 ++++++++++++++++++++++++++++++---- 1 file changed, 59 insertions(+), 6 deletions(-) diff --git a/pkg/git/libgit2/managed/ssh.go b/pkg/git/libgit2/managed/ssh.go index a4a732ad..71ef6f39 100644 --- a/pkg/git/libgit2/managed/ssh.go +++ b/pkg/git/libgit2/managed/ssh.go @@ -107,6 +107,11 @@ type sshSmartSubtransport struct { // aMux is the read-write mutex to control access to sshClients. var aMux sync.RWMutex +type cachedClient struct { + *ssh.Client + activeSessions uint16 +} + // sshClients stores active ssh clients/connections to be reused. // // Once opened, connections will be kept cached until an error occurs @@ -114,7 +119,7 @@ var aMux sync.RWMutex // a follow-up cache miss. // // The key must be based on cacheKey, refer to that function's comments. -var sshClients map[string]*ssh.Client = make(map[string]*ssh.Client) +var sshClients map[string]*cachedClient = make(map[string]*cachedClient) func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServiceAction) (git2go.SmartSubtransportStream, error) { runtime.LockOSThread() @@ -202,13 +207,14 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi } var cacheHit bool - aMux.RLock() + aMux.Lock() if c, ok := sshClients[ckey]; ok { traceLog.Info("[ssh]: cache hit", "remoteAddress", addr) - t.client = c + t.client = c.Client cacheHit = true + c.activeSessions++ } - aMux.RUnlock() + aMux.Unlock() if t.client == nil { cacheHit = false @@ -298,7 +304,11 @@ func (t *sshSmartSubtransport) createConn(ckey, addr string, sshConfig *ssh.Clie return err } - sshClients[ckey] = t.client + sshClients[ckey] = &cachedClient{ + Client: t.client, + activeSessions: 1, + } + return nil } @@ -325,6 +335,7 @@ func (t *sshSmartSubtransport) Close() error { // failure closing a session suggests a stale connection. if err != nil && t.ckey != "" { discardCachedSshClient(t.ckey) + t.ckey = "" } } t.session = nil @@ -350,6 +361,13 @@ func (stream *sshSmartSubtransportStream) Write(buf []byte) (int, error) { func (stream *sshSmartSubtransportStream) Free() { traceLog.Info("[ssh]: sshSmartSubtransportStream.Free()") + if stream.owner == nil { + return + } + + if stream.owner.ckey != "" { + decrementActiveSessionIfFound(stream.owner.ckey) + } } func cacheKeyAndConfig(remoteAddress string, cred *git2go.Credential) (string, *ssh.ClientConfig, error) { @@ -420,8 +438,43 @@ func discardCachedSshClient(key string) { aMux.Lock() defer aMux.Unlock() - if _, found := sshClients[key]; found { + if v, found := sshClients[key]; found { traceLog.Info("[ssh]: discard cached ssh client") + + v.activeSessions-- + closeConn := func() { + if v.Client != nil { + // run as async goroutine to minimise mutex time in immediate closures. + go func() { + _ = v.Client.Close() + }() + } + } + + // if no active sessions for this connection, close it right-away. + // otherwise, it may be used by other processes, so remove from cache, + // and schedule a delayed closure. + if v.activeSessions == 0 { + traceLog.Info("[ssh]: closing connection") + closeConn() + } else { + go func() { + // the delay must account for in-flight operations + // that depends on this connection. + time.Sleep(120 * time.Second) + traceLog.Info("[ssh]: closing connection after delay") + closeConn() + }() + } delete(sshClients, key) } } + +func decrementActiveSessionIfFound(key string) { + aMux.Lock() + defer aMux.Unlock() + + if v, found := sshClients[key]; found { + v.activeSessions-- + } +} From b264a3513dea0030e6add680cc14ebc891f2e048 Mon Sep 17 00:00:00 2001 From: Paulo Gomes Date: Thu, 7 Apr 2022 14:45:50 +0100 Subject: [PATCH 6/7] libgit2: refactor max length values into constants Signed-off-by: Paulo Gomes --- pkg/git/libgit2/managed/const.go | 27 +++++++++++++++++++++++++++ pkg/git/libgit2/managed/http.go | 4 ++-- pkg/git/libgit2/managed/ssh.go | 4 ++-- 3 files changed, 31 insertions(+), 4 deletions(-) create mode 100644 pkg/git/libgit2/managed/const.go diff --git a/pkg/git/libgit2/managed/const.go b/pkg/git/libgit2/managed/const.go new file mode 100644 index 00000000..f41035da --- /dev/null +++ b/pkg/git/libgit2/managed/const.go @@ -0,0 +1,27 @@ +/* +Copyright 2022 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package managed + +const ( + // URLMaxLength represents the max length for the entire URL + // when cloning Git repositories via HTTP(S). + URLMaxLength = 2048 + + // PathMaxLength represents the max length for the path element + // when cloning Git repositories via SSH. + PathMaxLength = 4096 +) diff --git a/pkg/git/libgit2/managed/http.go b/pkg/git/libgit2/managed/http.go index 8a58dbb8..09c0ee26 100644 --- a/pkg/git/libgit2/managed/http.go +++ b/pkg/git/libgit2/managed/http.go @@ -171,8 +171,8 @@ func createClientRequest(targetUrl string, action git2go.SmartServiceAction, t * } } - if len(finalUrl) > 2048 { - return nil, nil, fmt.Errorf("URL exceeds the max length (2048)") + if len(finalUrl) > URLMaxLength { + return nil, nil, fmt.Errorf("URL exceeds the max length (%d)", URLMaxLength) } client := &http.Client{ diff --git a/pkg/git/libgit2/managed/ssh.go b/pkg/git/libgit2/managed/ssh.go index 71ef6f39..82acd1da 100644 --- a/pkg/git/libgit2/managed/ssh.go +++ b/pkg/git/libgit2/managed/ssh.go @@ -130,8 +130,8 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi return nil, err } - if len(u.Path) > 4096 { - return nil, fmt.Errorf("path exceeds the max length (4096)") + if len(u.Path) > PathMaxLength { + return nil, fmt.Errorf("path exceeds the max length (%d)", PathMaxLength) } // decode URI's path From 68eece48fb36133669f926a739aefd3519be4ba5 Mon Sep 17 00:00:00 2001 From: Paulo Gomes Date: Thu, 7 Apr 2022 17:47:04 +0100 Subject: [PATCH 7/7] libgit2: optimise mutex on cached connections Previously the mutex.Lock was acquired before creating a new connection. The lock would then hold until the process was finished, and all network latency would be absorbed by other goroutines trying to establish a new connection. Now the lock is acquired after the connection has been created. The downside of this approach is that concurrent goroutine may be trying to open a connection to the same target. The loser in the race will then have to Close the connection and use the winner's instead. Signed-off-by: Paulo Gomes --- pkg/git/libgit2/managed/ssh.go | 57 ++++++++++++++++++++-------------- 1 file changed, 33 insertions(+), 24 deletions(-) diff --git a/pkg/git/libgit2/managed/ssh.go b/pkg/git/libgit2/managed/ssh.go index 82acd1da..a36ac166 100644 --- a/pkg/git/libgit2/managed/ssh.go +++ b/pkg/git/libgit2/managed/ssh.go @@ -102,6 +102,7 @@ type sshSmartSubtransport struct { stdout io.Reader currentStream *sshSmartSubtransportStream ckey string + addr string } // aMux is the read-write mutex to control access to sshClients. @@ -182,6 +183,7 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi port = u.Port() } addr = fmt.Sprintf("%s:%s", u.Hostname(), port) + t.addr = addr ckey, sshConfig, err := cacheKeyAndConfig(addr, cred) if err != nil { @@ -229,9 +231,9 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi if t.session, err = t.client.NewSession(); err != nil { discardCachedSshClient(ckey) - // if the current connection was cached, and the error is EOF, - // we can try again as this may be a stale connection. - if !(cacheHit && err.Error() == "EOF") { + // if the current connection was cached, we can try again + // as this may be a stale connection. + if !cacheHit { return nil, err } @@ -274,9 +276,6 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi } func (t *sshSmartSubtransport) createConn(ckey, addr string, sshConfig *ssh.ClientConfig) error { - aMux.Lock() - defer aMux.Unlock() - // In some scenarios the ssh handshake can hang indefinitely at // golang.org/x/crypto/ssh.(*handshakeTransport).kexLoop. // @@ -284,8 +283,9 @@ func (t *sshSmartSubtransport) createConn(ckey, addr string, sshConfig *ssh.Clie done := make(chan error, 1) var err error + var c *ssh.Client go func() { - t.client, err = ssh.Dial("tcp", addr, sshConfig) + c, err = ssh.Dial("tcp", addr, sshConfig) done <- err }() @@ -304,8 +304,24 @@ func (t *sshSmartSubtransport) createConn(ckey, addr string, sshConfig *ssh.Clie return err } + t.client = c + + // Mutex is set here to avoid the network latency being + // absorbed by all competing goroutines. + aMux.Lock() + defer aMux.Unlock() + + // A different goroutine won the race, dispose the connection + // and carry on. + if _, ok := sshClients[ckey]; ok { + go func() { + _ = c.Close() + }() + return nil + } + sshClients[ckey] = &cachedClient{ - Client: t.client, + Client: c, activeSessions: 1, } @@ -322,7 +338,7 @@ func (t *sshSmartSubtransport) createConn(ckey, addr string, sshConfig *ssh.Clie // may impair the transport to have successful actions on a new // SmartSubTransport (i.e. unreleased resources, staled connections). func (t *sshSmartSubtransport) Close() error { - traceLog.Info("[ssh]: sshSmartSubtransport.Close()") + traceLog.Info("[ssh]: sshSmartSubtransport.Close()", "server", t.addr) t.currentStream = nil if t.client != nil && t.stdin != nil { _ = t.stdin.Close() @@ -330,13 +346,8 @@ func (t *sshSmartSubtransport) Close() error { t.client = nil if t.session != nil { - traceLog.Info("[ssh]: session.Close()") - err := t.session.Close() - // failure closing a session suggests a stale connection. - if err != nil && t.ckey != "" { - discardCachedSshClient(t.ckey) - t.ckey = "" - } + traceLog.Info("[ssh]: session.Close()", "server", t.addr) + _ = t.session.Close() } t.session = nil @@ -439,16 +450,14 @@ func discardCachedSshClient(key string) { defer aMux.Unlock() if v, found := sshClients[key]; found { - traceLog.Info("[ssh]: discard cached ssh client") - - v.activeSessions-- + traceLog.Info("[ssh]: discard cached ssh client", "activeSessions", v.activeSessions) closeConn := func() { - if v.Client != nil { - // run as async goroutine to minimise mutex time in immediate closures. - go func() { + // run as async goroutine to minimise mutex time in immediate closures. + go func() { + if v.Client != nil { _ = v.Client.Close() - }() - } + } + }() } // if no active sessions for this connection, close it right-away.