libgit2: remove deadlock
Some scenarios may lead to deadlocks, specially in image automation controller. Signed-off-by: Paulo Gomes <paulo.gomes@weave.works>
This commit is contained in:
parent
812f6e49dd
commit
09fae634df
|
@ -54,6 +54,7 @@ import (
|
||||||
"runtime"
|
"runtime"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"golang.org/x/crypto/ssh"
|
"golang.org/x/crypto/ssh"
|
||||||
|
@ -80,10 +81,12 @@ func registerManagedSSH() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func sshSmartSubtransportFactory(remote *git2go.Remote, transport *git2go.Transport) (git2go.SmartSubtransport, error) {
|
func sshSmartSubtransportFactory(remote *git2go.Remote, transport *git2go.Transport) (git2go.SmartSubtransport, error) {
|
||||||
|
var closed int32 = 0
|
||||||
return &sshSmartSubtransport{
|
return &sshSmartSubtransport{
|
||||||
transport: transport,
|
transport: transport,
|
||||||
ctx: context.Background(),
|
ctx: context.Background(),
|
||||||
logger: logr.Discard(),
|
logger: logr.Discard(),
|
||||||
|
closedSessions: &closed,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -109,6 +112,8 @@ type sshSmartSubtransport struct {
|
||||||
stdin io.WriteCloser
|
stdin io.WriteCloser
|
||||||
stdout io.Reader
|
stdout io.Reader
|
||||||
|
|
||||||
|
closedSessions *int32
|
||||||
|
|
||||||
con connection
|
con connection
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -117,7 +122,6 @@ type connection struct {
|
||||||
session *ssh.Session
|
session *ssh.Session
|
||||||
currentStream *sshSmartSubtransportStream
|
currentStream *sshSmartSubtransportStream
|
||||||
connected bool
|
connected bool
|
||||||
m sync.RWMutex
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.SmartServiceAction) (git2go.SmartSubtransportStream, error) {
|
func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.SmartServiceAction) (git2go.SmartSubtransportStream, error) {
|
||||||
|
@ -208,13 +212,11 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
t.con.m.RLock()
|
if t.con.connected {
|
||||||
if t.con.connected == true {
|
|
||||||
// The connection is no longer shared across actions, so ensures
|
// The connection is no longer shared across actions, so ensures
|
||||||
// all has been released before starting a new connection.
|
// all has been released before starting a new connection.
|
||||||
_ = t.Close()
|
_ = t.Close()
|
||||||
}
|
}
|
||||||
t.con.m.RUnlock()
|
|
||||||
|
|
||||||
err = t.createConn(addr, sshConfig)
|
err = t.createConn(addr, sshConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -251,7 +253,6 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
|
||||||
"recovered from libgit2 ssh smart subtransport panic")
|
"recovered from libgit2 ssh smart subtransport panic")
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
var cancel context.CancelFunc
|
var cancel context.CancelFunc
|
||||||
ctx := t.ctx
|
ctx := t.ctx
|
||||||
|
|
||||||
|
@ -261,6 +262,7 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
|
||||||
defer cancel()
|
defer cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
closedAlready := atomic.LoadInt32(t.closedSessions)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
@ -268,12 +270,9 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
|
||||||
return nil
|
return nil
|
||||||
|
|
||||||
default:
|
default:
|
||||||
t.con.m.RLock()
|
if atomic.LoadInt32(t.closedSessions) > closedAlready {
|
||||||
if !t.con.connected {
|
|
||||||
t.con.m.RUnlock()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
t.con.m.RUnlock()
|
|
||||||
|
|
||||||
_, err := io.Copy(w, reader)
|
_, err := io.Copy(w, reader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -311,10 +310,8 @@ func (t *sshSmartSubtransport) createConn(addr string, sshConfig *ssh.ClientConf
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
t.con.m.Lock()
|
|
||||||
t.con.connected = true
|
t.con.connected = true
|
||||||
t.con.client = ssh.NewClient(c, chans, reqs)
|
t.con.client = ssh.NewClient(c, chans, reqs)
|
||||||
t.con.m.Unlock()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -330,8 +327,6 @@ func (t *sshSmartSubtransport) createConn(addr string, sshConfig *ssh.ClientConf
|
||||||
// SmartSubTransport (i.e. unreleased resources, staled connections).
|
// SmartSubTransport (i.e. unreleased resources, staled connections).
|
||||||
func (t *sshSmartSubtransport) Close() error {
|
func (t *sshSmartSubtransport) Close() error {
|
||||||
t.logger.V(logger.TraceLevel).Info("sshSmartSubtransport.Close()")
|
t.logger.V(logger.TraceLevel).Info("sshSmartSubtransport.Close()")
|
||||||
t.con.m.Lock()
|
|
||||||
defer t.con.m.Unlock()
|
|
||||||
|
|
||||||
t.con.currentStream = nil
|
t.con.currentStream = nil
|
||||||
if t.con.client != nil && t.stdin != nil {
|
if t.con.client != nil && t.stdin != nil {
|
||||||
|
@ -349,8 +344,10 @@ func (t *sshSmartSubtransport) Close() error {
|
||||||
_ = t.con.client.Close()
|
_ = t.con.client.Close()
|
||||||
t.logger.V(logger.TraceLevel).Info("close client")
|
t.logger.V(logger.TraceLevel).Info("close client")
|
||||||
}
|
}
|
||||||
|
t.con.client = nil
|
||||||
|
|
||||||
t.con.connected = false
|
t.con.connected = false
|
||||||
|
atomic.AddInt32(t.closedSessions, 1)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue