diff --git a/pkg/git/libgit2/managed/ssh.go b/pkg/git/libgit2/managed/ssh.go index a4a732ad..71ef6f39 100644 --- a/pkg/git/libgit2/managed/ssh.go +++ b/pkg/git/libgit2/managed/ssh.go @@ -107,6 +107,11 @@ type sshSmartSubtransport struct { // aMux is the read-write mutex to control access to sshClients. var aMux sync.RWMutex +type cachedClient struct { + *ssh.Client + activeSessions uint16 +} + // sshClients stores active ssh clients/connections to be reused. // // Once opened, connections will be kept cached until an error occurs @@ -114,7 +119,7 @@ var aMux sync.RWMutex // a follow-up cache miss. // // The key must be based on cacheKey, refer to that function's comments. -var sshClients map[string]*ssh.Client = make(map[string]*ssh.Client) +var sshClients map[string]*cachedClient = make(map[string]*cachedClient) func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServiceAction) (git2go.SmartSubtransportStream, error) { runtime.LockOSThread() @@ -202,13 +207,14 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi } var cacheHit bool - aMux.RLock() + aMux.Lock() if c, ok := sshClients[ckey]; ok { traceLog.Info("[ssh]: cache hit", "remoteAddress", addr) - t.client = c + t.client = c.Client cacheHit = true + c.activeSessions++ } - aMux.RUnlock() + aMux.Unlock() if t.client == nil { cacheHit = false @@ -298,7 +304,11 @@ func (t *sshSmartSubtransport) createConn(ckey, addr string, sshConfig *ssh.Clie return err } - sshClients[ckey] = t.client + sshClients[ckey] = &cachedClient{ + Client: t.client, + activeSessions: 1, + } + return nil } @@ -325,6 +335,7 @@ func (t *sshSmartSubtransport) Close() error { // failure closing a session suggests a stale connection. if err != nil && t.ckey != "" { discardCachedSshClient(t.ckey) + t.ckey = "" } } t.session = nil @@ -350,6 +361,13 @@ func (stream *sshSmartSubtransportStream) Write(buf []byte) (int, error) { func (stream *sshSmartSubtransportStream) Free() { traceLog.Info("[ssh]: sshSmartSubtransportStream.Free()") + if stream.owner == nil { + return + } + + if stream.owner.ckey != "" { + decrementActiveSessionIfFound(stream.owner.ckey) + } } func cacheKeyAndConfig(remoteAddress string, cred *git2go.Credential) (string, *ssh.ClientConfig, error) { @@ -420,8 +438,43 @@ func discardCachedSshClient(key string) { aMux.Lock() defer aMux.Unlock() - if _, found := sshClients[key]; found { + if v, found := sshClients[key]; found { traceLog.Info("[ssh]: discard cached ssh client") + + v.activeSessions-- + closeConn := func() { + if v.Client != nil { + // run as async goroutine to minimise mutex time in immediate closures. + go func() { + _ = v.Client.Close() + }() + } + } + + // if no active sessions for this connection, close it right-away. + // otherwise, it may be used by other processes, so remove from cache, + // and schedule a delayed closure. + if v.activeSessions == 0 { + traceLog.Info("[ssh]: closing connection") + closeConn() + } else { + go func() { + // the delay must account for in-flight operations + // that depends on this connection. + time.Sleep(120 * time.Second) + traceLog.Info("[ssh]: closing connection after delay") + closeConn() + }() + } delete(sshClients, key) } } + +func decrementActiveSessionIfFound(key string) { + aMux.Lock() + defer aMux.Unlock() + + if v, found := sshClients[key]; found { + v.activeSessions-- + } +}