libgit2: enforce context timeout
Some scenarios could lead a goroutine to be running indefinetely within managed ssh. Previously between the two git operations, the reconciliation could take twice the timeout set for the Flux object. Signed-off-by: Paulo Gomes <paulo.gomes@weave.works>
This commit is contained in:
parent
7953d0ea91
commit
978148ea71
|
@ -721,7 +721,10 @@ func (r *GitRepositoryReconciler) gitCheckout(ctx context.Context,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
checkoutStrategy, err := strategy.CheckoutStrategyForImplementation(ctx,
|
gitCtx, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
checkoutStrategy, err := strategy.CheckoutStrategyForImplementation(gitCtx,
|
||||||
git.Implementation(obj.Spec.GitImplementation), checkoutOpts)
|
git.Implementation(obj.Spec.GitImplementation), checkoutOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Do not return err as recovery without changes is impossible.
|
// Do not return err as recovery without changes is impossible.
|
||||||
|
@ -753,10 +756,6 @@ func (r *GitRepositoryReconciler) gitCheckout(ctx context.Context,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Checkout HEAD of reference in object
|
|
||||||
gitCtx, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
commit, err := checkoutStrategy.Checkout(gitCtx, dir, obj.Spec.URL, authOpts)
|
commit, err := checkoutStrategy.Checkout(gitCtx, dir, obj.Spec.URL, authOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e := serror.NewGeneric(
|
e := serror.NewGeneric(
|
||||||
|
|
|
@ -91,6 +91,7 @@ func (c *CheckoutBranch) Checkout(ctx context.Context, path, url string, opts *g
|
||||||
TargetURL: url,
|
TargetURL: url,
|
||||||
AuthOpts: opts,
|
AuthOpts: opts,
|
||||||
ProxyOptions: &git2go.ProxyOptions{Type: git2go.ProxyTypeAuto},
|
ProxyOptions: &git2go.ProxyOptions{Type: git2go.ProxyTypeAuto},
|
||||||
|
Context: ctx,
|
||||||
})
|
})
|
||||||
url = opts.TransportOptionsURL
|
url = opts.TransportOptionsURL
|
||||||
remoteCallBacks := managed.RemoteCallbacks()
|
remoteCallBacks := managed.RemoteCallbacks()
|
||||||
|
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
||||||
package managed
|
package managed
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/fluxcd/source-controller/pkg/git"
|
"github.com/fluxcd/source-controller/pkg/git"
|
||||||
|
@ -29,6 +30,7 @@ type TransportOptions struct {
|
||||||
TargetURL string
|
TargetURL string
|
||||||
AuthOpts *git.AuthOptions
|
AuthOpts *git.AuthOptions
|
||||||
ProxyOptions *git2go.ProxyOptions
|
ProxyOptions *git2go.ProxyOptions
|
||||||
|
Context context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
|
@ -92,6 +92,7 @@ type sshSmartSubtransport struct {
|
||||||
currentStream *sshSmartSubtransportStream
|
currentStream *sshSmartSubtransportStream
|
||||||
addr string
|
addr string
|
||||||
connected bool
|
connected bool
|
||||||
|
ctx context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.SmartServiceAction) (git2go.SmartSubtransportStream, error) {
|
func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.SmartServiceAction) (git2go.SmartSubtransportStream, error) {
|
||||||
|
@ -103,6 +104,8 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
|
||||||
return nil, fmt.Errorf("could not find transport options for object: %s", transportOptionsURL)
|
return nil, fmt.Errorf("could not find transport options for object: %s", transportOptionsURL)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
t.ctx = opts.Context
|
||||||
|
|
||||||
u, err := url.Parse(opts.TargetURL)
|
u, err := url.Parse(opts.TargetURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -206,16 +209,33 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
|
||||||
// xref: https://github.com/golang/crypto/blob/eb4f295cb31f7fb5d52810411604a2638c9b19a2/ssh/session.go#L553-L558
|
// xref: https://github.com/golang/crypto/blob/eb4f295cb31f7fb5d52810411604a2638c9b19a2/ssh/session.go#L553-L558
|
||||||
go func() error {
|
go func() error {
|
||||||
defer w.Close()
|
defer w.Close()
|
||||||
for {
|
|
||||||
if !t.connected {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err := io.Copy(w, reader)
|
var cancel context.CancelFunc
|
||||||
if err != nil {
|
ctx := t.ctx
|
||||||
return err
|
|
||||||
|
// When context is nil, creates a new with internal SSH connection timeout.
|
||||||
|
if ctx == nil {
|
||||||
|
ctx, cancel = context.WithTimeout(context.Background(), sshConnectionTimeOut)
|
||||||
|
defer cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
t.Close()
|
||||||
|
return nil
|
||||||
|
|
||||||
|
default:
|
||||||
|
if !t.connected {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := io.Copy(w, reader)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
time.Sleep(5 * time.Millisecond)
|
||||||
}
|
}
|
||||||
time.Sleep(5 * time.Millisecond)
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|
|
@ -292,7 +292,7 @@ func TestCheckoutStrategyForImplementation_Proxied(t *testing.T) {
|
||||||
|
|
||||||
return nil, func() {}
|
return nil, func() {}
|
||||||
},
|
},
|
||||||
shortTimeout: false,
|
shortTimeout: true,
|
||||||
wantUsedProxy: false,
|
wantUsedProxy: false,
|
||||||
wantError: true,
|
wantError: true,
|
||||||
},
|
},
|
||||||
|
|
Loading…
Reference in New Issue