libgit2: close discarded connections
Cached connections can be shared across concurrent operations, and their disposal must take that into account to avoid closing a connection that is stale for one goroutine, but is still valid for another. Signed-off-by: Paulo Gomes <paulo.gomes@weave.works>
This commit is contained in:
parent
add07745f3
commit
3ab95a4bf0
|
@ -107,6 +107,11 @@ type sshSmartSubtransport struct {
|
||||||
// aMux is the read-write mutex to control access to sshClients.
|
// aMux is the read-write mutex to control access to sshClients.
|
||||||
var aMux sync.RWMutex
|
var aMux sync.RWMutex
|
||||||
|
|
||||||
|
type cachedClient struct {
|
||||||
|
*ssh.Client
|
||||||
|
activeSessions uint16
|
||||||
|
}
|
||||||
|
|
||||||
// sshClients stores active ssh clients/connections to be reused.
|
// sshClients stores active ssh clients/connections to be reused.
|
||||||
//
|
//
|
||||||
// Once opened, connections will be kept cached until an error occurs
|
// Once opened, connections will be kept cached until an error occurs
|
||||||
|
@ -114,7 +119,7 @@ var aMux sync.RWMutex
|
||||||
// a follow-up cache miss.
|
// a follow-up cache miss.
|
||||||
//
|
//
|
||||||
// The key must be based on cacheKey, refer to that function's comments.
|
// The key must be based on cacheKey, refer to that function's comments.
|
||||||
var sshClients map[string]*ssh.Client = make(map[string]*ssh.Client)
|
var sshClients map[string]*cachedClient = make(map[string]*cachedClient)
|
||||||
|
|
||||||
func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServiceAction) (git2go.SmartSubtransportStream, error) {
|
func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServiceAction) (git2go.SmartSubtransportStream, error) {
|
||||||
runtime.LockOSThread()
|
runtime.LockOSThread()
|
||||||
|
@ -202,13 +207,14 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi
|
||||||
}
|
}
|
||||||
|
|
||||||
var cacheHit bool
|
var cacheHit bool
|
||||||
aMux.RLock()
|
aMux.Lock()
|
||||||
if c, ok := sshClients[ckey]; ok {
|
if c, ok := sshClients[ckey]; ok {
|
||||||
traceLog.Info("[ssh]: cache hit", "remoteAddress", addr)
|
traceLog.Info("[ssh]: cache hit", "remoteAddress", addr)
|
||||||
t.client = c
|
t.client = c.Client
|
||||||
cacheHit = true
|
cacheHit = true
|
||||||
|
c.activeSessions++
|
||||||
}
|
}
|
||||||
aMux.RUnlock()
|
aMux.Unlock()
|
||||||
|
|
||||||
if t.client == nil {
|
if t.client == nil {
|
||||||
cacheHit = false
|
cacheHit = false
|
||||||
|
@ -298,7 +304,11 @@ func (t *sshSmartSubtransport) createConn(ckey, addr string, sshConfig *ssh.Clie
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
sshClients[ckey] = t.client
|
sshClients[ckey] = &cachedClient{
|
||||||
|
Client: t.client,
|
||||||
|
activeSessions: 1,
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -325,6 +335,7 @@ func (t *sshSmartSubtransport) Close() error {
|
||||||
// failure closing a session suggests a stale connection.
|
// failure closing a session suggests a stale connection.
|
||||||
if err != nil && t.ckey != "" {
|
if err != nil && t.ckey != "" {
|
||||||
discardCachedSshClient(t.ckey)
|
discardCachedSshClient(t.ckey)
|
||||||
|
t.ckey = ""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
t.session = nil
|
t.session = nil
|
||||||
|
@ -350,6 +361,13 @@ func (stream *sshSmartSubtransportStream) Write(buf []byte) (int, error) {
|
||||||
|
|
||||||
func (stream *sshSmartSubtransportStream) Free() {
|
func (stream *sshSmartSubtransportStream) Free() {
|
||||||
traceLog.Info("[ssh]: sshSmartSubtransportStream.Free()")
|
traceLog.Info("[ssh]: sshSmartSubtransportStream.Free()")
|
||||||
|
if stream.owner == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if stream.owner.ckey != "" {
|
||||||
|
decrementActiveSessionIfFound(stream.owner.ckey)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func cacheKeyAndConfig(remoteAddress string, cred *git2go.Credential) (string, *ssh.ClientConfig, error) {
|
func cacheKeyAndConfig(remoteAddress string, cred *git2go.Credential) (string, *ssh.ClientConfig, error) {
|
||||||
|
@ -420,8 +438,43 @@ func discardCachedSshClient(key string) {
|
||||||
aMux.Lock()
|
aMux.Lock()
|
||||||
defer aMux.Unlock()
|
defer aMux.Unlock()
|
||||||
|
|
||||||
if _, found := sshClients[key]; found {
|
if v, found := sshClients[key]; found {
|
||||||
traceLog.Info("[ssh]: discard cached ssh client")
|
traceLog.Info("[ssh]: discard cached ssh client")
|
||||||
|
|
||||||
|
v.activeSessions--
|
||||||
|
closeConn := func() {
|
||||||
|
if v.Client != nil {
|
||||||
|
// run as async goroutine to minimise mutex time in immediate closures.
|
||||||
|
go func() {
|
||||||
|
_ = v.Client.Close()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// if no active sessions for this connection, close it right-away.
|
||||||
|
// otherwise, it may be used by other processes, so remove from cache,
|
||||||
|
// and schedule a delayed closure.
|
||||||
|
if v.activeSessions == 0 {
|
||||||
|
traceLog.Info("[ssh]: closing connection")
|
||||||
|
closeConn()
|
||||||
|
} else {
|
||||||
|
go func() {
|
||||||
|
// the delay must account for in-flight operations
|
||||||
|
// that depends on this connection.
|
||||||
|
time.Sleep(120 * time.Second)
|
||||||
|
traceLog.Info("[ssh]: closing connection after delay")
|
||||||
|
closeConn()
|
||||||
|
}()
|
||||||
|
}
|
||||||
delete(sshClients, key)
|
delete(sshClients, key)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func decrementActiveSessionIfFound(key string) {
|
||||||
|
aMux.Lock()
|
||||||
|
defer aMux.Unlock()
|
||||||
|
|
||||||
|
if v, found := sshClients[key]; found {
|
||||||
|
v.activeSessions--
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue