libgit2: optimise mutex on cached connections
Previously the mutex.Lock was acquired before creating a new connection. The lock would then hold until the process was finished, and all network latency would be absorbed by other goroutines trying to establish a new connection. Now the lock is acquired after the connection has been created. The downside of this approach is that concurrent goroutine may be trying to open a connection to the same target. The loser in the race will then have to Close the connection and use the winner's instead. Signed-off-by: Paulo Gomes <paulo.gomes@weave.works>
This commit is contained in:
parent
b264a3513d
commit
68eece48fb
|
@ -102,6 +102,7 @@ type sshSmartSubtransport struct {
|
||||||
stdout io.Reader
|
stdout io.Reader
|
||||||
currentStream *sshSmartSubtransportStream
|
currentStream *sshSmartSubtransportStream
|
||||||
ckey string
|
ckey string
|
||||||
|
addr string
|
||||||
}
|
}
|
||||||
|
|
||||||
// aMux is the read-write mutex to control access to sshClients.
|
// aMux is the read-write mutex to control access to sshClients.
|
||||||
|
@ -182,6 +183,7 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi
|
||||||
port = u.Port()
|
port = u.Port()
|
||||||
}
|
}
|
||||||
addr = fmt.Sprintf("%s:%s", u.Hostname(), port)
|
addr = fmt.Sprintf("%s:%s", u.Hostname(), port)
|
||||||
|
t.addr = addr
|
||||||
|
|
||||||
ckey, sshConfig, err := cacheKeyAndConfig(addr, cred)
|
ckey, sshConfig, err := cacheKeyAndConfig(addr, cred)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -229,9 +231,9 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi
|
||||||
if t.session, err = t.client.NewSession(); err != nil {
|
if t.session, err = t.client.NewSession(); err != nil {
|
||||||
discardCachedSshClient(ckey)
|
discardCachedSshClient(ckey)
|
||||||
|
|
||||||
// if the current connection was cached, and the error is EOF,
|
// if the current connection was cached, we can try again
|
||||||
// we can try again as this may be a stale connection.
|
// as this may be a stale connection.
|
||||||
if !(cacheHit && err.Error() == "EOF") {
|
if !cacheHit {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -274,9 +276,6 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *sshSmartSubtransport) createConn(ckey, addr string, sshConfig *ssh.ClientConfig) error {
|
func (t *sshSmartSubtransport) createConn(ckey, addr string, sshConfig *ssh.ClientConfig) error {
|
||||||
aMux.Lock()
|
|
||||||
defer aMux.Unlock()
|
|
||||||
|
|
||||||
// In some scenarios the ssh handshake can hang indefinitely at
|
// In some scenarios the ssh handshake can hang indefinitely at
|
||||||
// golang.org/x/crypto/ssh.(*handshakeTransport).kexLoop.
|
// golang.org/x/crypto/ssh.(*handshakeTransport).kexLoop.
|
||||||
//
|
//
|
||||||
|
@ -284,8 +283,9 @@ func (t *sshSmartSubtransport) createConn(ckey, addr string, sshConfig *ssh.Clie
|
||||||
done := make(chan error, 1)
|
done := make(chan error, 1)
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
|
var c *ssh.Client
|
||||||
go func() {
|
go func() {
|
||||||
t.client, err = ssh.Dial("tcp", addr, sshConfig)
|
c, err = ssh.Dial("tcp", addr, sshConfig)
|
||||||
done <- err
|
done <- err
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -304,8 +304,24 @@ func (t *sshSmartSubtransport) createConn(ckey, addr string, sshConfig *ssh.Clie
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
t.client = c
|
||||||
|
|
||||||
|
// Mutex is set here to avoid the network latency being
|
||||||
|
// absorbed by all competing goroutines.
|
||||||
|
aMux.Lock()
|
||||||
|
defer aMux.Unlock()
|
||||||
|
|
||||||
|
// A different goroutine won the race, dispose the connection
|
||||||
|
// and carry on.
|
||||||
|
if _, ok := sshClients[ckey]; ok {
|
||||||
|
go func() {
|
||||||
|
_ = c.Close()
|
||||||
|
}()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
sshClients[ckey] = &cachedClient{
|
sshClients[ckey] = &cachedClient{
|
||||||
Client: t.client,
|
Client: c,
|
||||||
activeSessions: 1,
|
activeSessions: 1,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -322,7 +338,7 @@ func (t *sshSmartSubtransport) createConn(ckey, addr string, sshConfig *ssh.Clie
|
||||||
// may impair the transport to have successful actions on a new
|
// may impair the transport to have successful actions on a new
|
||||||
// SmartSubTransport (i.e. unreleased resources, staled connections).
|
// SmartSubTransport (i.e. unreleased resources, staled connections).
|
||||||
func (t *sshSmartSubtransport) Close() error {
|
func (t *sshSmartSubtransport) Close() error {
|
||||||
traceLog.Info("[ssh]: sshSmartSubtransport.Close()")
|
traceLog.Info("[ssh]: sshSmartSubtransport.Close()", "server", t.addr)
|
||||||
t.currentStream = nil
|
t.currentStream = nil
|
||||||
if t.client != nil && t.stdin != nil {
|
if t.client != nil && t.stdin != nil {
|
||||||
_ = t.stdin.Close()
|
_ = t.stdin.Close()
|
||||||
|
@ -330,13 +346,8 @@ func (t *sshSmartSubtransport) Close() error {
|
||||||
t.client = nil
|
t.client = nil
|
||||||
|
|
||||||
if t.session != nil {
|
if t.session != nil {
|
||||||
traceLog.Info("[ssh]: session.Close()")
|
traceLog.Info("[ssh]: session.Close()", "server", t.addr)
|
||||||
err := t.session.Close()
|
_ = t.session.Close()
|
||||||
// failure closing a session suggests a stale connection.
|
|
||||||
if err != nil && t.ckey != "" {
|
|
||||||
discardCachedSshClient(t.ckey)
|
|
||||||
t.ckey = ""
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
t.session = nil
|
t.session = nil
|
||||||
|
|
||||||
|
@ -439,16 +450,14 @@ func discardCachedSshClient(key string) {
|
||||||
defer aMux.Unlock()
|
defer aMux.Unlock()
|
||||||
|
|
||||||
if v, found := sshClients[key]; found {
|
if v, found := sshClients[key]; found {
|
||||||
traceLog.Info("[ssh]: discard cached ssh client")
|
traceLog.Info("[ssh]: discard cached ssh client", "activeSessions", v.activeSessions)
|
||||||
|
|
||||||
v.activeSessions--
|
|
||||||
closeConn := func() {
|
closeConn := func() {
|
||||||
if v.Client != nil {
|
// run as async goroutine to minimise mutex time in immediate closures.
|
||||||
// run as async goroutine to minimise mutex time in immediate closures.
|
go func() {
|
||||||
go func() {
|
if v.Client != nil {
|
||||||
_ = v.Client.Close()
|
_ = v.Client.Close()
|
||||||
}()
|
}
|
||||||
}
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
// if no active sessions for this connection, close it right-away.
|
// if no active sessions for this connection, close it right-away.
|
||||||
|
|
Loading…
Reference in New Issue