Merge pull request #753 from fluxcd/libgit2-ssh-race-fixes

libgit2/managed: fix race conditions in ssh transport
This commit is contained in:
Sunny 2022-06-03 02:16:13 +05:30 committed by GitHub
commit c5a5707ede
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 57 additions and 32 deletions

View File

@ -46,12 +46,14 @@ package managed
import ( import (
"context" "context"
"crypto/sha256" "crypto/sha256"
"errors"
"fmt" "fmt"
"io" "io"
"net" "net"
"net/url" "net/url"
"runtime" "runtime"
"strings" "strings"
"sync"
"time" "time"
"golang.org/x/crypto/ssh" "golang.org/x/crypto/ssh"
@ -83,16 +85,22 @@ func sshSmartSubtransportFactory(remote *git2go.Remote, transport *git2go.Transp
type sshSmartSubtransport struct { type sshSmartSubtransport struct {
transport *git2go.Transport transport *git2go.Transport
lastAction git2go.SmartServiceAction lastAction git2go.SmartServiceAction
stdin io.WriteCloser
stdout io.Reader
addr string
ctx context.Context
con connection
}
type connection struct {
conn net.Conn conn net.Conn
client *ssh.Client client *ssh.Client
session *ssh.Session session *ssh.Session
stdin io.WriteCloser
stdout io.Reader
currentStream *sshSmartSubtransportStream currentStream *sshSmartSubtransportStream
addr string
connected bool connected bool
ctx context.Context m sync.Mutex
} }
func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.SmartServiceAction) (git2go.SmartSubtransportStream, error) { func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.SmartServiceAction) (git2go.SmartSubtransportStream, error) {
@ -128,17 +136,17 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
var cmd string var cmd string
switch action { switch action {
case git2go.SmartServiceActionUploadpackLs, git2go.SmartServiceActionUploadpack: case git2go.SmartServiceActionUploadpackLs, git2go.SmartServiceActionUploadpack:
if t.currentStream != nil { if t.con.currentStream != nil {
if t.lastAction == git2go.SmartServiceActionUploadpackLs { if t.lastAction == git2go.SmartServiceActionUploadpackLs {
return t.currentStream, nil return t.con.currentStream, nil
} }
} }
cmd = fmt.Sprintf("git-upload-pack '%s'", uPath) cmd = fmt.Sprintf("git-upload-pack '%s'", uPath)
case git2go.SmartServiceActionReceivepackLs, git2go.SmartServiceActionReceivepack: case git2go.SmartServiceActionReceivepackLs, git2go.SmartServiceActionReceivepack:
if t.currentStream != nil { if t.con.currentStream != nil {
if t.lastAction == git2go.SmartServiceActionReceivepackLs { if t.lastAction == git2go.SmartServiceActionReceivepackLs {
return t.currentStream, nil return t.con.currentStream, nil
} }
} }
cmd = fmt.Sprintf("git-receive-pack '%s'", uPath) cmd = fmt.Sprintf("git-receive-pack '%s'", uPath)
@ -147,7 +155,7 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
return nil, fmt.Errorf("unexpected action: %v", action) return nil, fmt.Errorf("unexpected action: %v", action)
} }
if t.connected { if t.con.connected {
// Disregard errors from previous stream, futher details inside Close(). // Disregard errors from previous stream, futher details inside Close().
_ = t.Close() _ = t.Close()
} }
@ -185,21 +193,23 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
if err != nil { if err != nil {
return nil, err return nil, err
} }
t.connected = true t.con.m.Lock()
t.con.connected = true
t.con.m.Unlock()
traceLog.Info("[ssh]: creating new ssh session") traceLog.Info("[ssh]: creating new ssh session")
if t.session, err = t.client.NewSession(); err != nil { if t.con.session, err = t.con.client.NewSession(); err != nil {
return nil, err return nil, err
} }
if t.stdin, err = t.session.StdinPipe(); err != nil { if t.stdin, err = t.con.session.StdinPipe(); err != nil {
return nil, err return nil, err
} }
var w *io.PipeWriter var w *io.PipeWriter
var reader io.Reader var reader io.Reader
t.stdout, w = io.Pipe() t.stdout, w = io.Pipe()
if reader, err = t.session.StdoutPipe(); err != nil { if reader, err = t.con.session.StdoutPipe(); err != nil {
return nil, err return nil, err
} }
@ -208,7 +218,15 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
// //
// xref: https://github.com/golang/crypto/blob/eb4f295cb31f7fb5d52810411604a2638c9b19a2/ssh/session.go#L553-L558 // xref: https://github.com/golang/crypto/blob/eb4f295cb31f7fb5d52810411604a2638c9b19a2/ssh/session.go#L553-L558
go func() error { go func() error {
defer w.Close() defer func() {
w.Close()
// In case this goroutine panics, handle recovery.
if r := recover(); r != nil {
traceLog.Error(errors.New(r.(string)),
"[ssh]: recovered from libgit2 ssh smart subtransport panic", "address", t.addr)
}
}()
var cancel context.CancelFunc var cancel context.CancelFunc
ctx := t.ctx ctx := t.ctx
@ -226,9 +244,12 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
return nil return nil
default: default:
if !t.connected { t.con.m.Lock()
if !t.con.connected {
t.con.m.Unlock()
return nil return nil
} }
t.con.m.Unlock()
_, err := io.Copy(w, reader) _, err := io.Copy(w, reader)
if err != nil { if err != nil {
@ -240,16 +261,16 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
}() }()
traceLog.Info("[ssh]: run on remote", "cmd", cmd) traceLog.Info("[ssh]: run on remote", "cmd", cmd)
if err := t.session.Start(cmd); err != nil { if err := t.con.session.Start(cmd); err != nil {
return nil, err return nil, err
} }
t.lastAction = action t.lastAction = action
t.currentStream = &sshSmartSubtransportStream{ t.con.currentStream = &sshSmartSubtransportStream{
owner: t, owner: t,
} }
return t.currentStream, nil return t.con.currentStream, nil
} }
func (t *sshSmartSubtransport) createConn(addr string, sshConfig *ssh.ClientConfig) error { func (t *sshSmartSubtransport) createConn(addr string, sshConfig *ssh.ClientConfig) error {
@ -265,8 +286,8 @@ func (t *sshSmartSubtransport) createConn(addr string, sshConfig *ssh.ClientConf
return err return err
} }
t.conn = conn t.con.conn = conn
t.client = ssh.NewClient(c, chans, reqs) t.con.client = ssh.NewClient(c, chans, reqs)
return nil return nil
} }
@ -282,31 +303,35 @@ func (t *sshSmartSubtransport) createConn(addr string, sshConfig *ssh.ClientConf
// SmartSubTransport (i.e. unreleased resources, staled connections). // SmartSubTransport (i.e. unreleased resources, staled connections).
func (t *sshSmartSubtransport) Close() error { func (t *sshSmartSubtransport) Close() error {
traceLog.Info("[ssh]: sshSmartSubtransport.Close()", "server", t.addr) traceLog.Info("[ssh]: sshSmartSubtransport.Close()", "server", t.addr)
t.currentStream = nil t.con.m.Lock()
if t.client != nil && t.stdin != nil { defer t.con.m.Unlock()
t.con.currentStream = nil
if t.con.client != nil && t.stdin != nil {
_ = t.stdin.Close() _ = t.stdin.Close()
} }
t.client = nil t.con.client = nil
if t.session != nil { if t.con.session != nil {
traceLog.Info("[ssh]: session.Close()", "server", t.addr) traceLog.Info("[ssh]: session.Close()", "server", t.addr)
_ = t.session.Close() _ = t.con.session.Close()
} }
t.session = nil t.con.session = nil
return nil return nil
} }
func (t *sshSmartSubtransport) Free() { func (t *sshSmartSubtransport) Free() {
traceLog.Info("[ssh]: sshSmartSubtransport.Free()") traceLog.Info("[ssh]: sshSmartSubtransport.Free()")
if t.client != nil { if t.con.client != nil {
_ = t.client.Close() _ = t.con.client.Close()
} }
if t.conn != nil { if t.con.conn != nil {
_ = t.conn.Close() _ = t.con.conn.Close()
} }
t.connected = false t.con.m.Lock()
t.con.connected = false
t.con.m.Unlock()
} }
type sshSmartSubtransportStream struct { type sshSmartSubtransportStream struct {