Simplify port-forwarding code (#2976)

* Simplify port-forwarding code

Simplifies the establishment of a port-forwarding by moving the common
logic into `PortForward.Init()`

Stemmed from this
[comment](https://github.com/linkerd/linkerd2/pull/2937#discussion_r295078800)

Signed-off-by: Alejandro Pedraza <alejandro@buoyant.io>
This commit is contained in:
Alejandro Pedraza 2019-06-26 11:14:57 -05:00 committed by GitHub
parent 2ca8fbcb8c
commit 73740fb503
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 57 additions and 53 deletions

View File

@ -71,7 +71,6 @@ func newCmdDashboard() *cobra.Command {
return err
}
wait := make(chan struct{}, 1)
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
defer signal.Stop(signals)
@ -89,23 +88,17 @@ func newCmdDashboard() *cobra.Command {
os.Exit(1)
}
go func() {
err := portforward.Run()
if err != nil {
// TODO: consider falling back to an ephemeral port if defaultPort is taken
fmt.Fprintf(os.Stderr, "Error running port-forward: %s\nCheck for `linkerd dashboard` running in other terminal sessions, or use the `--port` flag.\n", err)
os.Exit(1)
}
close(wait)
}()
if err = portforward.Init(); err != nil {
// TODO: consider falling back to an ephemeral port if defaultPort is taken
fmt.Fprintf(os.Stderr, "Error running port-forward: %s\nCheck for `linkerd dashboard` running in other terminal sessions, or use the `--port` flag.\n", err)
os.Exit(1)
}
go func() {
<-signals
portforward.Stop()
}()
<-portforward.Ready()
webURL := portforward.URLFor("")
grafanaURL := portforward.URLFor("/grafana")
@ -133,7 +126,7 @@ func newCmdDashboard() *cobra.Command {
// no-op, we already printed the URLs
}
<-wait
<-portforward.GetStop()
return nil
},
}

View File

@ -161,16 +161,9 @@ func getMetrics(
}
defer portforward.Stop()
go func() {
err := portforward.Run()
if err != nil {
fmt.Fprintf(os.Stderr, "Error running port-forward: %s", err)
portforward.Stop()
}
}()
<-portforward.Ready()
if err = portforward.Init(); err != nil {
fmt.Fprintf(os.Stderr, "Error running port-forward: %s", err)
}
metricsURL := portforward.URLFor("/metrics")
resp, err := http.Get(metricsURL)

View File

@ -252,27 +252,7 @@ func NewExternalClient(controlPlaneNamespace string, kubeAPI *k8s.KubernetesAPI)
return nil, err
}
log.Debugf("Starting port forward on [%s]", apiURL)
wait := make(chan error, 1)
go func() {
if err := portforward.Run(); err != nil {
wait <- err
}
portforward.Stop()
}()
select {
case <-portforward.Ready():
log.Debugf("Port forward initialised")
break
case err := <-wait:
log.Debugf("Port forward failed: %v", err)
if err = portforward.Init(); err != nil {
return nil, err
}

View File

@ -9,6 +9,7 @@ import (
"os"
"strings"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/rest"
@ -137,8 +138,9 @@ func newPortForward(
}, nil
}
// Run creates and runs the port-forward connection.
func (pf *PortForward) Run() error {
// run creates and runs the port-forward connection.
// When the connection is established it blocks until Stop() is called.
func (pf *PortForward) run() error {
transport, upgrader, err := spdy.RoundTripperFor(pf.config)
if err != nil {
return err
@ -162,11 +164,40 @@ func (pf *PortForward) Run() error {
return fw.ForwardPorts()
}
// Ready returns a channel that will receive a message when the port-forward
// connection is ready. Clients should block and wait for the message before
// using the port-forward connection.
func (pf *PortForward) Ready() <-chan struct{} {
return pf.readyCh
// Init creates and runs a port-forward connection.
// This function blocks until the connection is established, in which case it returns nil.
// It's the caller's responsibility to call Stop() to finish the connection.
func (pf *PortForward) Init() error {
log.Debugf("Starting port forward to %s %d:%d", pf.url, pf.localPort, pf.remotePort)
failure := make(chan error)
go func() {
if err := pf.run(); err != nil {
failure <- err
}
select {
case <-pf.GetStop():
// stopCh was closed, do nothing
default:
// pf.run() returned for some other reason, close stopCh
pf.Stop()
}
}()
// The `select` statement below depends on one of two outcomes from `pf.run()`:
// 1) Succeed and block, causing a receive on `<-pf.readyCh`
// 2) Return an err, causing a receive `<-failure`
select {
case <-pf.readyCh:
log.Debug("Port forward initialised")
case err := <-failure:
log.Debugf("Port forward failed: %v", err)
return err
}
return nil
}
// Stop terminates the port-forward connection.
@ -174,6 +205,12 @@ func (pf *PortForward) Stop() {
close(pf.stopCh)
}
// GetStop returns the stopCh.
// Receiving on stopCh will block until the port forwarding stops.
func (pf *PortForward) GetStop() <-chan struct{} {
return pf.stopCh
}
// URLFor returns the URL for the port-forward connection.
func (pf *PortForward) URLFor(path string) string {
return fmt.Sprintf("http://127.0.0.1:%d%s", pf.localPort, path)

View File

@ -260,8 +260,9 @@ func (h *KubernetesHelper) URLFor(namespace, deployName string, remotePort int)
return "", err
}
go pf.Run()
<-pf.Ready()
if err = pf.Init(); err != nil {
return "", err
}
return pf.URLFor(""), nil
}