libgit2/managed: fix race issues in ssh transport
Race conditions in ssh smart subtransport caused some goroutines to panic, resulting in crashing the whole controller, mostly evident in image-automation-controller CI runs. Panic recovery in the main thread do not handle goroutine panics. So, the existing panic recovery code in libgit2 Checkout() methods weren't able to handle it. This change groups the fields in ssh smart subtransport that may be accessed by multiple goroutines into a new struct with a mutex. Also adds panic recovery in the created goroutine to handle any other possible panics. Signed-off-by: Sunny <darkowlzz@protonmail.com>
This commit is contained in:
parent
dc1037d962
commit
7f7490ebf0
|
@ -46,12 +46,14 @@ package managed
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"net/url"
|
"net/url"
|
||||||
"runtime"
|
"runtime"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"golang.org/x/crypto/ssh"
|
"golang.org/x/crypto/ssh"
|
||||||
|
@ -84,15 +86,21 @@ type sshSmartSubtransport struct {
|
||||||
transport *git2go.Transport
|
transport *git2go.Transport
|
||||||
|
|
||||||
lastAction git2go.SmartServiceAction
|
lastAction git2go.SmartServiceAction
|
||||||
|
stdin io.WriteCloser
|
||||||
|
stdout io.Reader
|
||||||
|
addr string
|
||||||
|
ctx context.Context
|
||||||
|
|
||||||
|
con connection
|
||||||
|
}
|
||||||
|
|
||||||
|
type connection struct {
|
||||||
conn net.Conn
|
conn net.Conn
|
||||||
client *ssh.Client
|
client *ssh.Client
|
||||||
session *ssh.Session
|
session *ssh.Session
|
||||||
stdin io.WriteCloser
|
|
||||||
stdout io.Reader
|
|
||||||
currentStream *sshSmartSubtransportStream
|
currentStream *sshSmartSubtransportStream
|
||||||
addr string
|
|
||||||
connected bool
|
connected bool
|
||||||
ctx context.Context
|
m sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.SmartServiceAction) (git2go.SmartSubtransportStream, error) {
|
func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.SmartServiceAction) (git2go.SmartSubtransportStream, error) {
|
||||||
|
@ -128,17 +136,17 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
|
||||||
var cmd string
|
var cmd string
|
||||||
switch action {
|
switch action {
|
||||||
case git2go.SmartServiceActionUploadpackLs, git2go.SmartServiceActionUploadpack:
|
case git2go.SmartServiceActionUploadpackLs, git2go.SmartServiceActionUploadpack:
|
||||||
if t.currentStream != nil {
|
if t.con.currentStream != nil {
|
||||||
if t.lastAction == git2go.SmartServiceActionUploadpackLs {
|
if t.lastAction == git2go.SmartServiceActionUploadpackLs {
|
||||||
return t.currentStream, nil
|
return t.con.currentStream, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
cmd = fmt.Sprintf("git-upload-pack '%s'", uPath)
|
cmd = fmt.Sprintf("git-upload-pack '%s'", uPath)
|
||||||
|
|
||||||
case git2go.SmartServiceActionReceivepackLs, git2go.SmartServiceActionReceivepack:
|
case git2go.SmartServiceActionReceivepackLs, git2go.SmartServiceActionReceivepack:
|
||||||
if t.currentStream != nil {
|
if t.con.currentStream != nil {
|
||||||
if t.lastAction == git2go.SmartServiceActionReceivepackLs {
|
if t.lastAction == git2go.SmartServiceActionReceivepackLs {
|
||||||
return t.currentStream, nil
|
return t.con.currentStream, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
cmd = fmt.Sprintf("git-receive-pack '%s'", uPath)
|
cmd = fmt.Sprintf("git-receive-pack '%s'", uPath)
|
||||||
|
@ -147,7 +155,7 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
|
||||||
return nil, fmt.Errorf("unexpected action: %v", action)
|
return nil, fmt.Errorf("unexpected action: %v", action)
|
||||||
}
|
}
|
||||||
|
|
||||||
if t.connected {
|
if t.con.connected {
|
||||||
// Disregard errors from previous stream, futher details inside Close().
|
// Disregard errors from previous stream, futher details inside Close().
|
||||||
_ = t.Close()
|
_ = t.Close()
|
||||||
}
|
}
|
||||||
|
@ -185,21 +193,23 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
t.connected = true
|
t.con.m.Lock()
|
||||||
|
t.con.connected = true
|
||||||
|
t.con.m.Unlock()
|
||||||
|
|
||||||
traceLog.Info("[ssh]: creating new ssh session")
|
traceLog.Info("[ssh]: creating new ssh session")
|
||||||
if t.session, err = t.client.NewSession(); err != nil {
|
if t.con.session, err = t.con.client.NewSession(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if t.stdin, err = t.session.StdinPipe(); err != nil {
|
if t.stdin, err = t.con.session.StdinPipe(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var w *io.PipeWriter
|
var w *io.PipeWriter
|
||||||
var reader io.Reader
|
var reader io.Reader
|
||||||
t.stdout, w = io.Pipe()
|
t.stdout, w = io.Pipe()
|
||||||
if reader, err = t.session.StdoutPipe(); err != nil {
|
if reader, err = t.con.session.StdoutPipe(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -208,7 +218,15 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
|
||||||
//
|
//
|
||||||
// xref: https://github.com/golang/crypto/blob/eb4f295cb31f7fb5d52810411604a2638c9b19a2/ssh/session.go#L553-L558
|
// xref: https://github.com/golang/crypto/blob/eb4f295cb31f7fb5d52810411604a2638c9b19a2/ssh/session.go#L553-L558
|
||||||
go func() error {
|
go func() error {
|
||||||
defer w.Close()
|
defer func() {
|
||||||
|
w.Close()
|
||||||
|
|
||||||
|
// In case this goroutine panics, handle recovery.
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
traceLog.Error(errors.New(r.(string)),
|
||||||
|
"[ssh]: recovered from libgit2 ssh smart subtransport panic", "address", t.addr)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
var cancel context.CancelFunc
|
var cancel context.CancelFunc
|
||||||
ctx := t.ctx
|
ctx := t.ctx
|
||||||
|
@ -226,9 +244,12 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
|
||||||
return nil
|
return nil
|
||||||
|
|
||||||
default:
|
default:
|
||||||
if !t.connected {
|
t.con.m.Lock()
|
||||||
|
if !t.con.connected {
|
||||||
|
t.con.m.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
t.con.m.Unlock()
|
||||||
|
|
||||||
_, err := io.Copy(w, reader)
|
_, err := io.Copy(w, reader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -240,16 +261,16 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
|
||||||
}()
|
}()
|
||||||
|
|
||||||
traceLog.Info("[ssh]: run on remote", "cmd", cmd)
|
traceLog.Info("[ssh]: run on remote", "cmd", cmd)
|
||||||
if err := t.session.Start(cmd); err != nil {
|
if err := t.con.session.Start(cmd); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
t.lastAction = action
|
t.lastAction = action
|
||||||
t.currentStream = &sshSmartSubtransportStream{
|
t.con.currentStream = &sshSmartSubtransportStream{
|
||||||
owner: t,
|
owner: t,
|
||||||
}
|
}
|
||||||
|
|
||||||
return t.currentStream, nil
|
return t.con.currentStream, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *sshSmartSubtransport) createConn(addr string, sshConfig *ssh.ClientConfig) error {
|
func (t *sshSmartSubtransport) createConn(addr string, sshConfig *ssh.ClientConfig) error {
|
||||||
|
@ -265,8 +286,8 @@ func (t *sshSmartSubtransport) createConn(addr string, sshConfig *ssh.ClientConf
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
t.conn = conn
|
t.con.conn = conn
|
||||||
t.client = ssh.NewClient(c, chans, reqs)
|
t.con.client = ssh.NewClient(c, chans, reqs)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -282,31 +303,35 @@ func (t *sshSmartSubtransport) createConn(addr string, sshConfig *ssh.ClientConf
|
||||||
// SmartSubTransport (i.e. unreleased resources, staled connections).
|
// SmartSubTransport (i.e. unreleased resources, staled connections).
|
||||||
func (t *sshSmartSubtransport) Close() error {
|
func (t *sshSmartSubtransport) Close() error {
|
||||||
traceLog.Info("[ssh]: sshSmartSubtransport.Close()", "server", t.addr)
|
traceLog.Info("[ssh]: sshSmartSubtransport.Close()", "server", t.addr)
|
||||||
t.currentStream = nil
|
t.con.m.Lock()
|
||||||
if t.client != nil && t.stdin != nil {
|
defer t.con.m.Unlock()
|
||||||
|
t.con.currentStream = nil
|
||||||
|
if t.con.client != nil && t.stdin != nil {
|
||||||
_ = t.stdin.Close()
|
_ = t.stdin.Close()
|
||||||
}
|
}
|
||||||
t.client = nil
|
t.con.client = nil
|
||||||
|
|
||||||
if t.session != nil {
|
if t.con.session != nil {
|
||||||
traceLog.Info("[ssh]: session.Close()", "server", t.addr)
|
traceLog.Info("[ssh]: session.Close()", "server", t.addr)
|
||||||
_ = t.session.Close()
|
_ = t.con.session.Close()
|
||||||
}
|
}
|
||||||
t.session = nil
|
t.con.session = nil
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *sshSmartSubtransport) Free() {
|
func (t *sshSmartSubtransport) Free() {
|
||||||
traceLog.Info("[ssh]: sshSmartSubtransport.Free()")
|
traceLog.Info("[ssh]: sshSmartSubtransport.Free()")
|
||||||
if t.client != nil {
|
if t.con.client != nil {
|
||||||
_ = t.client.Close()
|
_ = t.con.client.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
if t.conn != nil {
|
if t.con.conn != nil {
|
||||||
_ = t.conn.Close()
|
_ = t.con.conn.Close()
|
||||||
}
|
}
|
||||||
t.connected = false
|
t.con.m.Lock()
|
||||||
|
t.con.connected = false
|
||||||
|
t.con.m.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
type sshSmartSubtransportStream struct {
|
type sshSmartSubtransportStream struct {
|
||||||
|
|
Loading…
Reference in New Issue