Merge pull request #785 from fluxcd/mutexless

libgit2: remove deadlock
This commit is contained in:
Sunny 2022-06-21 19:51:23 +05:30 committed by GitHub
commit 22951347dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 32 additions and 39 deletions

View File

@ -54,6 +54,7 @@ import (
"runtime" "runtime"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"golang.org/x/crypto/ssh" "golang.org/x/crypto/ssh"
@ -80,10 +81,12 @@ func registerManagedSSH() error {
} }
func sshSmartSubtransportFactory(remote *git2go.Remote, transport *git2go.Transport) (git2go.SmartSubtransport, error) { func sshSmartSubtransportFactory(remote *git2go.Remote, transport *git2go.Transport) (git2go.SmartSubtransport, error) {
var closed int32 = 0
return &sshSmartSubtransport{ return &sshSmartSubtransport{
transport: transport, transport: transport,
ctx: context.Background(), ctx: context.Background(),
logger: logr.Discard(), logger: logr.Discard(),
closedSessions: &closed,
}, nil }, nil
} }
@ -109,15 +112,12 @@ type sshSmartSubtransport struct {
stdin io.WriteCloser stdin io.WriteCloser
stdout io.Reader stdout io.Reader
con connection closedSessions *int32
}
type connection struct {
client *ssh.Client client *ssh.Client
session *ssh.Session session *ssh.Session
currentStream *sshSmartSubtransportStream currentStream *sshSmartSubtransportStream
connected bool connected bool
m sync.RWMutex
} }
func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.SmartServiceAction) (git2go.SmartSubtransportStream, error) { func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.SmartServiceAction) (git2go.SmartSubtransportStream, error) {
@ -151,17 +151,17 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
var cmd string var cmd string
switch action { switch action {
case git2go.SmartServiceActionUploadpackLs, git2go.SmartServiceActionUploadpack: case git2go.SmartServiceActionUploadpackLs, git2go.SmartServiceActionUploadpack:
if t.con.currentStream != nil { if t.currentStream != nil {
if t.lastAction == git2go.SmartServiceActionUploadpackLs { if t.lastAction == git2go.SmartServiceActionUploadpackLs {
return t.con.currentStream, nil return t.currentStream, nil
} }
} }
cmd = fmt.Sprintf("git-upload-pack '%s'", uPath) cmd = fmt.Sprintf("git-upload-pack '%s'", uPath)
case git2go.SmartServiceActionReceivepackLs, git2go.SmartServiceActionReceivepack: case git2go.SmartServiceActionReceivepackLs, git2go.SmartServiceActionReceivepack:
if t.con.currentStream != nil { if t.currentStream != nil {
if t.lastAction == git2go.SmartServiceActionReceivepackLs { if t.lastAction == git2go.SmartServiceActionReceivepackLs {
return t.con.currentStream, nil return t.currentStream, nil
} }
} }
cmd = fmt.Sprintf("git-receive-pack '%s'", uPath) cmd = fmt.Sprintf("git-receive-pack '%s'", uPath)
@ -208,13 +208,11 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
return nil return nil
} }
t.con.m.RLock() if t.connected {
if t.con.connected == true {
// The connection is no longer shared across actions, so ensures // The connection is no longer shared across actions, so ensures
// all has been released before starting a new connection. // all has been released before starting a new connection.
_ = t.Close() _ = t.Close()
} }
t.con.m.RUnlock()
err = t.createConn(addr, sshConfig) err = t.createConn(addr, sshConfig)
if err != nil { if err != nil {
@ -222,18 +220,18 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
} }
t.logger.V(logger.TraceLevel).Info("creating new ssh session") t.logger.V(logger.TraceLevel).Info("creating new ssh session")
if t.con.session, err = t.con.client.NewSession(); err != nil { if t.session, err = t.client.NewSession(); err != nil {
return nil, err return nil, err
} }
if t.stdin, err = t.con.session.StdinPipe(); err != nil { if t.stdin, err = t.session.StdinPipe(); err != nil {
return nil, err return nil, err
} }
var w *io.PipeWriter var w *io.PipeWriter
var reader io.Reader var reader io.Reader
t.stdout, w = io.Pipe() t.stdout, w = io.Pipe()
if reader, err = t.con.session.StdoutPipe(); err != nil { if reader, err = t.session.StdoutPipe(); err != nil {
return nil, err return nil, err
} }
@ -251,7 +249,6 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
"recovered from libgit2 ssh smart subtransport panic") "recovered from libgit2 ssh smart subtransport panic")
} }
}() }()
var cancel context.CancelFunc var cancel context.CancelFunc
ctx := t.ctx ctx := t.ctx
@ -261,6 +258,7 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
defer cancel() defer cancel()
} }
closedAlready := atomic.LoadInt32(t.closedSessions)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -268,12 +266,9 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
return nil return nil
default: default:
t.con.m.RLock() if atomic.LoadInt32(t.closedSessions) > closedAlready {
if !t.con.connected {
t.con.m.RUnlock()
return nil return nil
} }
t.con.m.RUnlock()
_, err := io.Copy(w, reader) _, err := io.Copy(w, reader)
if err != nil { if err != nil {
@ -285,16 +280,16 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
}() }()
t.logger.V(logger.TraceLevel).Info("run on remote", "cmd", cmd) t.logger.V(logger.TraceLevel).Info("run on remote", "cmd", cmd)
if err := t.con.session.Start(cmd); err != nil { if err := t.session.Start(cmd); err != nil {
return nil, err return nil, err
} }
t.lastAction = action t.lastAction = action
t.con.currentStream = &sshSmartSubtransportStream{ t.currentStream = &sshSmartSubtransportStream{
owner: t, owner: t,
} }
return t.con.currentStream, nil return t.currentStream, nil
} }
func (t *sshSmartSubtransport) createConn(addr string, sshConfig *ssh.ClientConfig) error { func (t *sshSmartSubtransport) createConn(addr string, sshConfig *ssh.ClientConfig) error {
@ -311,10 +306,8 @@ func (t *sshSmartSubtransport) createConn(addr string, sshConfig *ssh.ClientConf
return err return err
} }
t.con.m.Lock() t.connected = true
t.con.connected = true t.client = ssh.NewClient(c, chans, reqs)
t.con.client = ssh.NewClient(c, chans, reqs)
t.con.m.Unlock()
return nil return nil
} }
@ -330,27 +323,27 @@ func (t *sshSmartSubtransport) createConn(addr string, sshConfig *ssh.ClientConf
// SmartSubTransport (i.e. unreleased resources, staled connections). // SmartSubTransport (i.e. unreleased resources, staled connections).
func (t *sshSmartSubtransport) Close() error { func (t *sshSmartSubtransport) Close() error {
t.logger.V(logger.TraceLevel).Info("sshSmartSubtransport.Close()") t.logger.V(logger.TraceLevel).Info("sshSmartSubtransport.Close()")
t.con.m.Lock()
defer t.con.m.Unlock()
t.con.currentStream = nil t.currentStream = nil
if t.con.client != nil && t.stdin != nil { if t.client != nil && t.stdin != nil {
_ = t.stdin.Close() _ = t.stdin.Close()
} }
t.stdin = nil t.stdin = nil
if t.con.session != nil { if t.session != nil {
t.logger.V(logger.TraceLevel).Info("session.Close()") t.logger.V(logger.TraceLevel).Info("session.Close()")
_ = t.con.session.Close() _ = t.session.Close()
} }
t.con.session = nil t.session = nil
if t.con.client != nil { if t.client != nil {
_ = t.con.client.Close() _ = t.client.Close()
t.logger.V(logger.TraceLevel).Info("close client") t.logger.V(logger.TraceLevel).Info("close client")
} }
t.client = nil
t.con.connected = false t.connected = false
atomic.AddInt32(t.closedSessions, 1)
return nil return nil
} }