Merge pull request #658 from pjbgf/libgit2-fixes
libgit2: managed transport improvements
This commit is contained in:
commit
8593d5877f
|
|
@ -0,0 +1,27 @@
|
||||||
|
/*
|
||||||
|
Copyright 2022 The Flux authors
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package managed
|
||||||
|
|
||||||
|
const (
|
||||||
|
// URLMaxLength represents the max length for the entire URL
|
||||||
|
// when cloning Git repositories via HTTP(S).
|
||||||
|
URLMaxLength = 2048
|
||||||
|
|
||||||
|
// PathMaxLength represents the max length for the path element
|
||||||
|
// when cloning Git repositories via SSH.
|
||||||
|
PathMaxLength = 4096
|
||||||
|
)
|
||||||
|
|
@ -171,6 +171,10 @@ func createClientRequest(targetUrl string, action git2go.SmartServiceAction, t *
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(finalUrl) > URLMaxLength {
|
||||||
|
return nil, nil, fmt.Errorf("URL exceeds the max length (%d)", URLMaxLength)
|
||||||
|
}
|
||||||
|
|
||||||
client := &http.Client{
|
client := &http.Client{
|
||||||
Transport: t,
|
Transport: t,
|
||||||
Timeout: fullHttpClientTimeOut,
|
Timeout: fullHttpClientTimeOut,
|
||||||
|
|
|
||||||
|
|
@ -101,11 +101,18 @@ type sshSmartSubtransport struct {
|
||||||
stdin io.WriteCloser
|
stdin io.WriteCloser
|
||||||
stdout io.Reader
|
stdout io.Reader
|
||||||
currentStream *sshSmartSubtransportStream
|
currentStream *sshSmartSubtransportStream
|
||||||
|
ckey string
|
||||||
|
addr string
|
||||||
}
|
}
|
||||||
|
|
||||||
// aMux is the read-write mutex to control access to sshClients.
|
// aMux is the read-write mutex to control access to sshClients.
|
||||||
var aMux sync.RWMutex
|
var aMux sync.RWMutex
|
||||||
|
|
||||||
|
type cachedClient struct {
|
||||||
|
*ssh.Client
|
||||||
|
activeSessions uint16
|
||||||
|
}
|
||||||
|
|
||||||
// sshClients stores active ssh clients/connections to be reused.
|
// sshClients stores active ssh clients/connections to be reused.
|
||||||
//
|
//
|
||||||
// Once opened, connections will be kept cached until an error occurs
|
// Once opened, connections will be kept cached until an error occurs
|
||||||
|
|
@ -113,7 +120,7 @@ var aMux sync.RWMutex
|
||||||
// a follow-up cache miss.
|
// a follow-up cache miss.
|
||||||
//
|
//
|
||||||
// The key must be based on cacheKey, refer to that function's comments.
|
// The key must be based on cacheKey, refer to that function's comments.
|
||||||
var sshClients map[string]*ssh.Client = make(map[string]*ssh.Client)
|
var sshClients map[string]*cachedClient = make(map[string]*cachedClient)
|
||||||
|
|
||||||
func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServiceAction) (git2go.SmartSubtransportStream, error) {
|
func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServiceAction) (git2go.SmartSubtransportStream, error) {
|
||||||
runtime.LockOSThread()
|
runtime.LockOSThread()
|
||||||
|
|
@ -124,12 +131,19 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Escape \ and '.
|
if len(u.Path) > PathMaxLength {
|
||||||
uPath := strings.Replace(u.Path, `\`, `\\`, -1)
|
return nil, fmt.Errorf("path exceeds the max length (%d)", PathMaxLength)
|
||||||
uPath = strings.Replace(uPath, `'`, `\'`, -1)
|
}
|
||||||
|
|
||||||
// TODO: Add percentage decode similar to libgit2.
|
// decode URI's path
|
||||||
// Refer: https://github.com/libgit2/libgit2/blob/358a60e1b46000ea99ef10b4dd709e92f75ff74b/src/str.c#L455-L481
|
uPath, err := url.PathUnescape(u.Path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Escape \ and '.
|
||||||
|
uPath = strings.Replace(uPath, `\`, `\\`, -1)
|
||||||
|
uPath = strings.Replace(uPath, `'`, `\'`, -1)
|
||||||
|
|
||||||
var cmd string
|
var cmd string
|
||||||
switch action {
|
switch action {
|
||||||
|
|
@ -138,9 +152,8 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi
|
||||||
if t.lastAction == git2go.SmartServiceActionUploadpackLs {
|
if t.lastAction == git2go.SmartServiceActionUploadpackLs {
|
||||||
return t.currentStream, nil
|
return t.currentStream, nil
|
||||||
}
|
}
|
||||||
if err := t.Close(); err != nil {
|
// Disregard errors from previous stream, futher details inside Close().
|
||||||
traceLog.Error(err, "[ssh]: error cleaning up previous stream")
|
_ = t.Close()
|
||||||
}
|
|
||||||
}
|
}
|
||||||
cmd = fmt.Sprintf("git-upload-pack '%s'", uPath)
|
cmd = fmt.Sprintf("git-upload-pack '%s'", uPath)
|
||||||
|
|
||||||
|
|
@ -149,9 +162,8 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi
|
||||||
if t.lastAction == git2go.SmartServiceActionReceivepackLs {
|
if t.lastAction == git2go.SmartServiceActionReceivepackLs {
|
||||||
return t.currentStream, nil
|
return t.currentStream, nil
|
||||||
}
|
}
|
||||||
if err := t.Close(); err != nil {
|
// Disregard errors from previous stream, futher details inside Close().
|
||||||
traceLog.Error(err, "[ssh]: error cleaning up previous stream")
|
_ = t.Close()
|
||||||
}
|
|
||||||
}
|
}
|
||||||
cmd = fmt.Sprintf("git-receive-pack '%s'", uPath)
|
cmd = fmt.Sprintf("git-receive-pack '%s'", uPath)
|
||||||
|
|
||||||
|
|
@ -159,7 +171,7 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi
|
||||||
return nil, fmt.Errorf("unexpected action: %v", action)
|
return nil, fmt.Errorf("unexpected action: %v", action)
|
||||||
}
|
}
|
||||||
|
|
||||||
cred, err := t.transport.SmartCredentials("", git2go.CredentialTypeSSHKey|git2go.CredentialTypeSSHMemory)
|
cred, err := t.transport.SmartCredentials("", git2go.CredentialTypeSSHMemory)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
@ -171,11 +183,14 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi
|
||||||
port = u.Port()
|
port = u.Port()
|
||||||
}
|
}
|
||||||
addr = fmt.Sprintf("%s:%s", u.Hostname(), port)
|
addr = fmt.Sprintf("%s:%s", u.Hostname(), port)
|
||||||
|
t.addr = addr
|
||||||
|
|
||||||
ckey, sshConfig, err := cacheKeyAndConfig(addr, cred)
|
ckey, sshConfig, err := cacheKeyAndConfig(addr, cred)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
t.ckey = ckey
|
||||||
|
|
||||||
sshConfig.HostKeyCallback = func(hostname string, remote net.Addr, key ssh.PublicKey) error {
|
sshConfig.HostKeyCallback = func(hostname string, remote net.Addr, key ssh.PublicKey) error {
|
||||||
marshaledKey := key.Marshal()
|
marshaledKey := key.Marshal()
|
||||||
cert := &git2go.Certificate{
|
cert := &git2go.Certificate{
|
||||||
|
|
@ -193,53 +208,49 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi
|
||||||
return t.transport.SmartCertificateCheck(cert, true, hostname)
|
return t.transport.SmartCertificateCheck(cert, true, hostname)
|
||||||
}
|
}
|
||||||
|
|
||||||
aMux.RLock()
|
var cacheHit bool
|
||||||
|
aMux.Lock()
|
||||||
if c, ok := sshClients[ckey]; ok {
|
if c, ok := sshClients[ckey]; ok {
|
||||||
traceLog.Info("[ssh]: cache hit", "remoteAddress", addr)
|
traceLog.Info("[ssh]: cache hit", "remoteAddress", addr)
|
||||||
t.client = c
|
t.client = c.Client
|
||||||
|
cacheHit = true
|
||||||
|
c.activeSessions++
|
||||||
}
|
}
|
||||||
aMux.RUnlock()
|
aMux.Unlock()
|
||||||
|
|
||||||
if t.client == nil {
|
if t.client == nil {
|
||||||
|
cacheHit = false
|
||||||
traceLog.Info("[ssh]: cache miss", "remoteAddress", addr)
|
traceLog.Info("[ssh]: cache miss", "remoteAddress", addr)
|
||||||
|
err := t.createConn(ckey, addr, sshConfig)
|
||||||
aMux.Lock()
|
|
||||||
defer aMux.Unlock()
|
|
||||||
|
|
||||||
// In some scenarios the ssh handshake can hang indefinitely at
|
|
||||||
// golang.org/x/crypto/ssh.(*handshakeTransport).kexLoop.
|
|
||||||
//
|
|
||||||
// xref: https://github.com/golang/go/issues/51926
|
|
||||||
done := make(chan error, 1)
|
|
||||||
go func() {
|
|
||||||
t.client, err = ssh.Dial("tcp", addr, sshConfig)
|
|
||||||
done <- err
|
|
||||||
}()
|
|
||||||
|
|
||||||
dialTimeout := sshConfig.Timeout + (30 * time.Second)
|
|
||||||
|
|
||||||
select {
|
|
||||||
case doneErr := <-done:
|
|
||||||
if doneErr != nil {
|
|
||||||
err = fmt.Errorf("ssh.Dial: %w", doneErr)
|
|
||||||
}
|
|
||||||
case <-time.After(dialTimeout):
|
|
||||||
err = fmt.Errorf("timed out waiting for ssh.Dial after %s", dialTimeout)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
sshClients[ckey] = t.client
|
|
||||||
}
|
}
|
||||||
|
|
||||||
traceLog.Info("[ssh]: creating new ssh session")
|
traceLog.Info("[ssh]: creating new ssh session")
|
||||||
if t.session, err = t.client.NewSession(); err != nil {
|
if t.session, err = t.client.NewSession(); err != nil {
|
||||||
discardCachedSshClient(ckey)
|
discardCachedSshClient(ckey)
|
||||||
|
|
||||||
|
// if the current connection was cached, we can try again
|
||||||
|
// as this may be a stale connection.
|
||||||
|
if !cacheHit {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
traceLog.Info("[ssh]: cached connection was stale, retrying...")
|
||||||
|
err = t.createConn(ckey, addr, sshConfig)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
traceLog.Info("[ssh]: creating new ssh session with new connection")
|
||||||
|
t.session, err = t.client.NewSession()
|
||||||
|
if err != nil {
|
||||||
|
discardCachedSshClient(ckey)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if t.stdin, err = t.session.StdinPipe(); err != nil {
|
if t.stdin, err = t.session.StdinPipe(); err != nil {
|
||||||
discardCachedSshClient(ckey)
|
discardCachedSshClient(ckey)
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
@ -264,28 +275,83 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi
|
||||||
return t.currentStream, nil
|
return t.currentStream, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *sshSmartSubtransport) Close() error {
|
func (t *sshSmartSubtransport) createConn(ckey, addr string, sshConfig *ssh.ClientConfig) error {
|
||||||
var returnErr error
|
// In some scenarios the ssh handshake can hang indefinitely at
|
||||||
|
// golang.org/x/crypto/ssh.(*handshakeTransport).kexLoop.
|
||||||
|
//
|
||||||
|
// xref: https://github.com/golang/go/issues/51926
|
||||||
|
done := make(chan error, 1)
|
||||||
|
var err error
|
||||||
|
|
||||||
traceLog.Info("[ssh]: sshSmartSubtransport.Close()")
|
var c *ssh.Client
|
||||||
|
go func() {
|
||||||
|
c, err = ssh.Dial("tcp", addr, sshConfig)
|
||||||
|
done <- err
|
||||||
|
}()
|
||||||
|
|
||||||
|
dialTimeout := sshConfig.Timeout + (30 * time.Second)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case doneErr := <-done:
|
||||||
|
if doneErr != nil {
|
||||||
|
err = fmt.Errorf("ssh.Dial: %w", doneErr)
|
||||||
|
}
|
||||||
|
case <-time.After(dialTimeout):
|
||||||
|
err = fmt.Errorf("timed out waiting for ssh.Dial after %s", dialTimeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
t.client = c
|
||||||
|
|
||||||
|
// Mutex is set here to avoid the network latency being
|
||||||
|
// absorbed by all competing goroutines.
|
||||||
|
aMux.Lock()
|
||||||
|
defer aMux.Unlock()
|
||||||
|
|
||||||
|
// A different goroutine won the race, dispose the connection
|
||||||
|
// and carry on.
|
||||||
|
if _, ok := sshClients[ckey]; ok {
|
||||||
|
go func() {
|
||||||
|
_ = c.Close()
|
||||||
|
}()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
sshClients[ckey] = &cachedClient{
|
||||||
|
Client: c,
|
||||||
|
activeSessions: 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the smart subtransport.
|
||||||
|
//
|
||||||
|
// This is called internally ahead of a new action, and also
|
||||||
|
// upstream by the transport handler:
|
||||||
|
// https://github.com/libgit2/git2go/blob/0e8009f00a65034d196c67b1cdd82af6f12c34d3/transport.go#L409
|
||||||
|
//
|
||||||
|
// Avoid returning errors, but focus on releasing anything that
|
||||||
|
// may impair the transport to have successful actions on a new
|
||||||
|
// SmartSubTransport (i.e. unreleased resources, staled connections).
|
||||||
|
func (t *sshSmartSubtransport) Close() error {
|
||||||
|
traceLog.Info("[ssh]: sshSmartSubtransport.Close()", "server", t.addr)
|
||||||
t.currentStream = nil
|
t.currentStream = nil
|
||||||
if t.client != nil && t.stdin != nil {
|
if t.client != nil && t.stdin != nil {
|
||||||
if err := t.stdin.Close(); err != nil {
|
_ = t.stdin.Close()
|
||||||
returnErr = fmt.Errorf("cannot close stdin: %w", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
t.client = nil
|
t.client = nil
|
||||||
|
|
||||||
if t.session != nil {
|
if t.session != nil {
|
||||||
traceLog.Info("[ssh]: skipping session.wait")
|
traceLog.Info("[ssh]: session.Close()", "server", t.addr)
|
||||||
traceLog.Info("[ssh]: session.Close()")
|
_ = t.session.Close()
|
||||||
if err := t.session.Close(); err != nil {
|
|
||||||
returnErr = fmt.Errorf("cannot close session: %w", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
t.session = nil
|
t.session = nil
|
||||||
|
|
||||||
return returnErr
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *sshSmartSubtransport) Free() {
|
func (t *sshSmartSubtransport) Free() {
|
||||||
|
|
@ -306,6 +372,13 @@ func (stream *sshSmartSubtransportStream) Write(buf []byte) (int, error) {
|
||||||
|
|
||||||
func (stream *sshSmartSubtransportStream) Free() {
|
func (stream *sshSmartSubtransportStream) Free() {
|
||||||
traceLog.Info("[ssh]: sshSmartSubtransportStream.Free()")
|
traceLog.Info("[ssh]: sshSmartSubtransportStream.Free()")
|
||||||
|
if stream.owner == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if stream.owner.ckey != "" {
|
||||||
|
decrementActiveSessionIfFound(stream.owner.ckey)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func cacheKeyAndConfig(remoteAddress string, cred *git2go.Credential) (string, *ssh.ClientConfig, error) {
|
func cacheKeyAndConfig(remoteAddress string, cred *git2go.Credential) (string, *ssh.ClientConfig, error) {
|
||||||
|
|
@ -376,8 +449,41 @@ func discardCachedSshClient(key string) {
|
||||||
aMux.Lock()
|
aMux.Lock()
|
||||||
defer aMux.Unlock()
|
defer aMux.Unlock()
|
||||||
|
|
||||||
if _, found := sshClients[key]; found {
|
if v, found := sshClients[key]; found {
|
||||||
traceLog.Info("[ssh]: discard cached ssh client")
|
traceLog.Info("[ssh]: discard cached ssh client", "activeSessions", v.activeSessions)
|
||||||
|
closeConn := func() {
|
||||||
|
// run as async goroutine to minimise mutex time in immediate closures.
|
||||||
|
go func() {
|
||||||
|
if v.Client != nil {
|
||||||
|
_ = v.Client.Close()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// if no active sessions for this connection, close it right-away.
|
||||||
|
// otherwise, it may be used by other processes, so remove from cache,
|
||||||
|
// and schedule a delayed closure.
|
||||||
|
if v.activeSessions == 0 {
|
||||||
|
traceLog.Info("[ssh]: closing connection")
|
||||||
|
closeConn()
|
||||||
|
} else {
|
||||||
|
go func() {
|
||||||
|
// the delay must account for in-flight operations
|
||||||
|
// that depends on this connection.
|
||||||
|
time.Sleep(120 * time.Second)
|
||||||
|
traceLog.Info("[ssh]: closing connection after delay")
|
||||||
|
closeConn()
|
||||||
|
}()
|
||||||
|
}
|
||||||
delete(sshClients, key)
|
delete(sshClients, key)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func decrementActiveSessionIfFound(key string) {
|
||||||
|
aMux.Lock()
|
||||||
|
defer aMux.Unlock()
|
||||||
|
|
||||||
|
if v, found := sshClients[key]; found {
|
||||||
|
v.activeSessions--
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue