Merge pull request #713 from pjbgf/libgit2-cacheless-conns
libgit2: Disable connection caching
This commit is contained in:
commit
e180b3c944
2
go.mod
2
go.mod
|
@ -42,6 +42,7 @@ require (
|
|||
github.com/prometheus/client_golang v1.12.1
|
||||
github.com/spf13/pflag v1.0.5
|
||||
golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f
|
||||
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
|
||||
google.golang.org/api v0.77.0
|
||||
gotest.tools v2.2.0+incompatible
|
||||
|
@ -200,7 +201,6 @@ require (
|
|||
go.uber.org/atomic v1.7.0 // indirect
|
||||
go.uber.org/multierr v1.6.0 // indirect
|
||||
go.uber.org/zap v1.21.0 // indirect
|
||||
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 // indirect
|
||||
golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 // indirect
|
||||
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6 // indirect
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
|
||||
|
|
|
@ -44,6 +44,7 @@ THE SOFTWARE.
|
|||
package managed
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"crypto/sha1"
|
||||
"crypto/sha256"
|
||||
|
@ -53,10 +54,10 @@ import (
|
|||
"net/url"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/crypto/ssh"
|
||||
"golang.org/x/net/proxy"
|
||||
|
||||
"github.com/fluxcd/source-controller/pkg/git"
|
||||
git2go "github.com/libgit2/git2go/v33"
|
||||
|
@ -65,17 +66,6 @@ import (
|
|||
// registerManagedSSH registers a Go-native implementation of
|
||||
// SSH transport that doesn't rely on any lower-level libraries
|
||||
// such as libssh2.
|
||||
//
|
||||
// The underlying SSH connections are kept open and are reused
|
||||
// across several SSH sessions. This is due to upstream issues in
|
||||
// which concurrent/parallel SSH connections may lead to instability.
|
||||
//
|
||||
// Connections are created on first attempt to use a given remote. The
|
||||
// connection is removed from the cache on the first failed session related
|
||||
// operation.
|
||||
//
|
||||
// https://github.com/golang/go/issues/51926
|
||||
// https://github.com/golang/go/issues/27140
|
||||
func registerManagedSSH() error {
|
||||
for _, protocol := range []string{"ssh", "ssh+git", "git+ssh"} {
|
||||
_, err := git2go.NewRegisteredSmartTransport(protocol, false, sshSmartSubtransportFactory)
|
||||
|
@ -96,32 +86,16 @@ type sshSmartSubtransport struct {
|
|||
transport *git2go.Transport
|
||||
|
||||
lastAction git2go.SmartServiceAction
|
||||
conn net.Conn
|
||||
client *ssh.Client
|
||||
session *ssh.Session
|
||||
stdin io.WriteCloser
|
||||
stdout io.Reader
|
||||
currentStream *sshSmartSubtransportStream
|
||||
ckey string
|
||||
addr string
|
||||
connected bool
|
||||
}
|
||||
|
||||
// aMux is the read-write mutex to control access to sshClients.
|
||||
var aMux sync.RWMutex
|
||||
|
||||
type cachedClient struct {
|
||||
*ssh.Client
|
||||
activeSessions uint16
|
||||
}
|
||||
|
||||
// sshClients stores active ssh clients/connections to be reused.
|
||||
//
|
||||
// Once opened, connections will be kept cached until an error occurs
|
||||
// during SSH commands, by which point it will be discarded, leading to
|
||||
// a follow-up cache miss.
|
||||
//
|
||||
// The key must be based on cacheKey, refer to that function's comments.
|
||||
var sshClients map[string]*cachedClient = make(map[string]*cachedClient)
|
||||
|
||||
func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServiceAction) (git2go.SmartSubtransportStream, error) {
|
||||
runtime.LockOSThread()
|
||||
defer runtime.UnlockOSThread()
|
||||
|
@ -152,8 +126,6 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi
|
|||
if t.lastAction == git2go.SmartServiceActionUploadpackLs {
|
||||
return t.currentStream, nil
|
||||
}
|
||||
// Disregard errors from previous stream, futher details inside Close().
|
||||
_ = t.Close()
|
||||
}
|
||||
cmd = fmt.Sprintf("git-upload-pack '%s'", uPath)
|
||||
|
||||
|
@ -162,8 +134,6 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi
|
|||
if t.lastAction == git2go.SmartServiceActionReceivepackLs {
|
||||
return t.currentStream, nil
|
||||
}
|
||||
// Disregard errors from previous stream, futher details inside Close().
|
||||
_ = t.Close()
|
||||
}
|
||||
cmd = fmt.Sprintf("git-receive-pack '%s'", uPath)
|
||||
|
||||
|
@ -171,25 +141,27 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi
|
|||
return nil, fmt.Errorf("unexpected action: %v", action)
|
||||
}
|
||||
|
||||
if t.connected {
|
||||
// Disregard errors from previous stream, futher details inside Close().
|
||||
_ = t.Close()
|
||||
}
|
||||
|
||||
cred, err := t.transport.SmartCredentials("", git2go.CredentialTypeSSHMemory)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer cred.Free()
|
||||
|
||||
var addr string
|
||||
port := "22"
|
||||
if u.Port() != "" {
|
||||
port = u.Port()
|
||||
}
|
||||
addr = fmt.Sprintf("%s:%s", u.Hostname(), port)
|
||||
t.addr = addr
|
||||
t.addr = net.JoinHostPort(u.Hostname(), port)
|
||||
|
||||
ckey, sshConfig, err := cacheKeyAndConfig(addr, cred)
|
||||
sshConfig, err := clientConfig(t.addr, cred)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
t.ckey = ckey
|
||||
|
||||
sshConfig.HostKeyCallback = func(hostname string, remote net.Addr, key ssh.PublicKey) error {
|
||||
marshaledKey := key.Marshal()
|
||||
|
@ -208,62 +180,49 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi
|
|||
return t.transport.SmartCertificateCheck(cert, true, hostname)
|
||||
}
|
||||
|
||||
var cacheHit bool
|
||||
aMux.Lock()
|
||||
if c, ok := sshClients[ckey]; ok {
|
||||
traceLog.Info("[ssh]: cache hit", "remoteAddress", addr)
|
||||
t.client = c.Client
|
||||
cacheHit = true
|
||||
c.activeSessions++
|
||||
}
|
||||
aMux.Unlock()
|
||||
|
||||
if t.client == nil {
|
||||
cacheHit = false
|
||||
traceLog.Info("[ssh]: cache miss", "remoteAddress", addr)
|
||||
err := t.createConn(ckey, addr, sshConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = t.createConn(t.addr, sshConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
t.connected = true
|
||||
|
||||
traceLog.Info("[ssh]: creating new ssh session")
|
||||
if t.session, err = t.client.NewSession(); err != nil {
|
||||
discardCachedSshClient(ckey)
|
||||
|
||||
// if the current connection was cached, we can try again
|
||||
// as this may be a stale connection.
|
||||
if !cacheHit {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
traceLog.Info("[ssh]: cached connection was stale, retrying...")
|
||||
err = t.createConn(ckey, addr, sshConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
traceLog.Info("[ssh]: creating new ssh session with new connection")
|
||||
t.session, err = t.client.NewSession()
|
||||
if err != nil {
|
||||
discardCachedSshClient(ckey)
|
||||
return nil, err
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if t.stdin, err = t.session.StdinPipe(); err != nil {
|
||||
discardCachedSshClient(ckey)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if t.stdout, err = t.session.StdoutPipe(); err != nil {
|
||||
discardCachedSshClient(ckey)
|
||||
var w *io.PipeWriter
|
||||
var reader io.Reader
|
||||
t.stdout, w = io.Pipe()
|
||||
if reader, err = t.session.StdoutPipe(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// If the session's stdout pipe is not serviced fast
|
||||
// enough it may cause the remote command to block.
|
||||
//
|
||||
// xref: https://github.com/golang/crypto/blob/eb4f295cb31f7fb5d52810411604a2638c9b19a2/ssh/session.go#L553-L558
|
||||
go func() error {
|
||||
defer w.Close()
|
||||
for {
|
||||
if !t.connected {
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err := io.Copy(w, reader)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
}
|
||||
}()
|
||||
|
||||
traceLog.Info("[ssh]: run on remote", "cmd", cmd)
|
||||
if err := t.session.Start(cmd); err != nil {
|
||||
discardCachedSshClient(ckey)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -275,55 +234,21 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi
|
|||
return t.currentStream, nil
|
||||
}
|
||||
|
||||
func (t *sshSmartSubtransport) createConn(ckey, addr string, sshConfig *ssh.ClientConfig) error {
|
||||
// In some scenarios the ssh handshake can hang indefinitely at
|
||||
// golang.org/x/crypto/ssh.(*handshakeTransport).kexLoop.
|
||||
//
|
||||
// xref: https://github.com/golang/go/issues/51926
|
||||
done := make(chan error, 1)
|
||||
var err error
|
||||
func (t *sshSmartSubtransport) createConn(addr string, sshConfig *ssh.ClientConfig) error {
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), sshConnectionTimeOut)
|
||||
defer cancel()
|
||||
|
||||
var c *ssh.Client
|
||||
go func() {
|
||||
c, err = ssh.Dial("tcp", addr, sshConfig)
|
||||
done <- err
|
||||
}()
|
||||
|
||||
dialTimeout := sshConfig.Timeout + (30 * time.Second)
|
||||
|
||||
select {
|
||||
case doneErr := <-done:
|
||||
if doneErr != nil {
|
||||
err = fmt.Errorf("ssh.Dial: %w", doneErr)
|
||||
}
|
||||
case <-time.After(dialTimeout):
|
||||
err = fmt.Errorf("timed out waiting for ssh.Dial after %s", dialTimeout)
|
||||
conn, err := proxy.Dial(ctx, "tcp", addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c, chans, reqs, err := ssh.NewClientConn(conn, addr, sshConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
t.client = c
|
||||
|
||||
// Mutex is set here to avoid the network latency being
|
||||
// absorbed by all competing goroutines.
|
||||
aMux.Lock()
|
||||
defer aMux.Unlock()
|
||||
|
||||
// A different goroutine won the race, dispose the connection
|
||||
// and carry on.
|
||||
if _, ok := sshClients[ckey]; ok {
|
||||
go func() {
|
||||
_ = c.Close()
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
sshClients[ckey] = &cachedClient{
|
||||
Client: c,
|
||||
activeSessions: 1,
|
||||
}
|
||||
t.conn = conn
|
||||
t.client = ssh.NewClient(c, chans, reqs)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -356,6 +281,14 @@ func (t *sshSmartSubtransport) Close() error {
|
|||
|
||||
func (t *sshSmartSubtransport) Free() {
|
||||
traceLog.Info("[ssh]: sshSmartSubtransport.Free()")
|
||||
if t.client != nil {
|
||||
_ = t.client.Close()
|
||||
}
|
||||
|
||||
if t.conn != nil {
|
||||
_ = t.conn.Close()
|
||||
}
|
||||
t.connected = false
|
||||
}
|
||||
|
||||
type sshSmartSubtransportStream struct {
|
||||
|
@ -372,36 +305,25 @@ func (stream *sshSmartSubtransportStream) Write(buf []byte) (int, error) {
|
|||
|
||||
func (stream *sshSmartSubtransportStream) Free() {
|
||||
traceLog.Info("[ssh]: sshSmartSubtransportStream.Free()")
|
||||
if stream.owner == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if stream.owner.ckey != "" {
|
||||
decrementActiveSessionIfFound(stream.owner.ckey)
|
||||
}
|
||||
}
|
||||
|
||||
func cacheKeyAndConfig(remoteAddress string, cred *git2go.Credential) (string, *ssh.ClientConfig, error) {
|
||||
func clientConfig(remoteAddress string, cred *git2go.Credential) (*ssh.ClientConfig, error) {
|
||||
if cred == nil {
|
||||
return "", nil, fmt.Errorf("cannot create cache key from a nil credential")
|
||||
return nil, fmt.Errorf("cannot create ssh client config from a nil credential")
|
||||
}
|
||||
|
||||
username, _, privatekey, passphrase, err := cred.GetSSHKey()
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var pemBytes []byte
|
||||
if cred.Type() == git2go.CredentialTypeSSHMemory {
|
||||
pemBytes = []byte(privatekey)
|
||||
} else {
|
||||
return "", nil, fmt.Errorf("file based SSH credential is not supported")
|
||||
return nil, fmt.Errorf("file based SSH credential is not supported")
|
||||
}
|
||||
|
||||
// must include the passphrase, otherwise a caller that knows the private key, but
|
||||
// not its passphrase would be able to bypass auth.
|
||||
ck := cacheKey(remoteAddress, username, passphrase, pemBytes)
|
||||
|
||||
var key ssh.Signer
|
||||
if passphrase != "" {
|
||||
key, err = ssh.ParsePrivateKeyWithPassphrase(pemBytes, []byte(passphrase))
|
||||
|
@ -410,7 +332,7 @@ func cacheKeyAndConfig(remoteAddress string, cred *git2go.Credential) (string, *
|
|||
}
|
||||
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cfg := &ssh.ClientConfig{
|
||||
|
@ -425,68 +347,5 @@ func cacheKeyAndConfig(remoteAddress string, cred *git2go.Credential) (string, *
|
|||
cfg.HostKeyAlgorithms = git.HostKeyAlgos
|
||||
}
|
||||
|
||||
return ck, cfg, nil
|
||||
}
|
||||
|
||||
// cacheKey generates a cache key that is multi-tenancy safe.
|
||||
//
|
||||
// Stablishing multiple and concurrent ssh connections leads to stability
|
||||
// issues documented above. However, the caching/sharing of already stablished
|
||||
// connections could represent a vector for users to bypass the ssh authentication
|
||||
// mechanism.
|
||||
//
|
||||
// cacheKey tries to ensure that connections are only shared by users that
|
||||
// have the exact same remoteAddress and credentials.
|
||||
func cacheKey(remoteAddress, userName, passphrase string, pubKey []byte) string {
|
||||
h := sha256.New()
|
||||
|
||||
v := fmt.Sprintf("%s-%s-%s-%v", remoteAddress, userName, passphrase, pubKey)
|
||||
|
||||
h.Write([]byte(v))
|
||||
return fmt.Sprintf("%x", h.Sum(nil))
|
||||
}
|
||||
|
||||
// discardCachedSshClient discards the cached ssh client, forcing the next git operation
|
||||
// to create a new one via ssh.Dial.
|
||||
func discardCachedSshClient(key string) {
|
||||
aMux.Lock()
|
||||
defer aMux.Unlock()
|
||||
|
||||
if v, found := sshClients[key]; found {
|
||||
traceLog.Info("[ssh]: discard cached ssh client", "activeSessions", v.activeSessions)
|
||||
closeConn := func() {
|
||||
// run as async goroutine to minimise mutex time in immediate closures.
|
||||
go func() {
|
||||
if v.Client != nil {
|
||||
_ = v.Client.Close()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// if no active sessions for this connection, close it right-away.
|
||||
// otherwise, it may be used by other processes, so remove from cache,
|
||||
// and schedule a delayed closure.
|
||||
if v.activeSessions == 0 {
|
||||
traceLog.Info("[ssh]: closing connection")
|
||||
closeConn()
|
||||
} else {
|
||||
go func() {
|
||||
// the delay must account for in-flight operations
|
||||
// that depends on this connection.
|
||||
time.Sleep(120 * time.Second)
|
||||
traceLog.Info("[ssh]: closing connection after delay")
|
||||
closeConn()
|
||||
}()
|
||||
}
|
||||
delete(sshClients, key)
|
||||
}
|
||||
}
|
||||
|
||||
func decrementActiveSessionIfFound(key string) {
|
||||
aMux.Lock()
|
||||
defer aMux.Unlock()
|
||||
|
||||
if v, found := sshClients[key]; found {
|
||||
v.activeSessions--
|
||||
}
|
||||
return cfg, nil
|
||||
}
|
||||
|
|
|
@ -1,124 +0,0 @@
|
|||
/*
|
||||
Copyright 2022 The Flux authors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package managed
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestCacheKey(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
remoteAddress1 string
|
||||
user1 string
|
||||
passphrase1 string
|
||||
pubKey1 []byte
|
||||
remoteAddress2 string
|
||||
user2 string
|
||||
passphrase2 string
|
||||
pubKey2 []byte
|
||||
expectMatch bool
|
||||
}{
|
||||
{
|
||||
name: "same remote addresses with no config",
|
||||
remoteAddress1: "1.1.1.1",
|
||||
remoteAddress2: "1.1.1.1",
|
||||
expectMatch: true,
|
||||
},
|
||||
{
|
||||
name: "same remote addresses with different config",
|
||||
remoteAddress1: "1.1.1.1",
|
||||
user1: "joe",
|
||||
remoteAddress2: "1.1.1.1",
|
||||
user2: "another-joe",
|
||||
expectMatch: false,
|
||||
},
|
||||
{
|
||||
name: "different remote addresses with no config",
|
||||
remoteAddress1: "8.8.8.8",
|
||||
remoteAddress2: "1.1.1.1",
|
||||
expectMatch: false,
|
||||
},
|
||||
{
|
||||
name: "different remote addresses with same config",
|
||||
remoteAddress1: "8.8.8.8",
|
||||
user1: "legit",
|
||||
remoteAddress2: "1.1.1.1",
|
||||
user2: "legit",
|
||||
expectMatch: false,
|
||||
},
|
||||
{
|
||||
name: "same remote addresses with same pubkey signers",
|
||||
remoteAddress1: "1.1.1.1",
|
||||
user1: "same-jane",
|
||||
pubKey1: []byte{255, 123, 0},
|
||||
remoteAddress2: "1.1.1.1",
|
||||
user2: "same-jane",
|
||||
pubKey2: []byte{255, 123, 0},
|
||||
expectMatch: true,
|
||||
},
|
||||
{
|
||||
name: "same remote addresses with different pubkey signers",
|
||||
remoteAddress1: "1.1.1.1",
|
||||
user1: "same-jane",
|
||||
pubKey1: []byte{255, 123, 0},
|
||||
remoteAddress2: "1.1.1.1",
|
||||
user2: "same-jane",
|
||||
pubKey2: []byte{0, 123, 0},
|
||||
expectMatch: false,
|
||||
},
|
||||
{
|
||||
name: "same remote addresses with pubkey signers and passphrases",
|
||||
remoteAddress1: "1.1.1.1",
|
||||
user1: "same-jane",
|
||||
passphrase1: "same-pass",
|
||||
pubKey1: []byte{255, 123, 0},
|
||||
remoteAddress2: "1.1.1.1",
|
||||
user2: "same-jane",
|
||||
passphrase2: "same-pass",
|
||||
pubKey2: []byte{255, 123, 0},
|
||||
expectMatch: true,
|
||||
},
|
||||
{
|
||||
name: "same remote addresses with pubkey signers and different passphrases",
|
||||
remoteAddress1: "1.1.1.1",
|
||||
user1: "same-jane",
|
||||
passphrase1: "same-pass",
|
||||
pubKey1: []byte{255, 123, 0},
|
||||
remoteAddress2: "1.1.1.1",
|
||||
user2: "same-jane",
|
||||
passphrase2: "different-pass",
|
||||
pubKey2: []byte{255, 123, 0},
|
||||
expectMatch: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cacheKey1 := cacheKey(tt.remoteAddress1, tt.user1, tt.passphrase1, tt.pubKey1)
|
||||
cacheKey2 := cacheKey(tt.remoteAddress2, tt.user2, tt.passphrase2, tt.pubKey2)
|
||||
|
||||
if tt.expectMatch && cacheKey1 != cacheKey2 {
|
||||
t.Errorf("cache keys '%s' and '%s' should match", cacheKey1, cacheKey2)
|
||||
}
|
||||
|
||||
if !tt.expectMatch && cacheKey1 == cacheKey2 {
|
||||
t.Errorf("cache keys '%s' and '%s' should not match", cacheKey1, cacheKey2)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue