diff --git a/cli/cmd/profile.go b/cli/cmd/profile.go index 2e58db4c1..12e3241c8 100644 --- a/cli/cmd/profile.go +++ b/cli/cmd/profile.go @@ -120,7 +120,7 @@ func newCmdProfile() *cobra.Command { } else if options.openAPI != "" { return profiles.RenderOpenAPI(options.openAPI, options.namespace, options.name, clusterDomain, os.Stdout) } else if options.tap != "" { - return profiles.RenderTapOutputProfile(k8sAPI, options.tap, options.namespace, options.name, clusterDomain, options.tapDuration, int(options.tapRouteLimit), os.Stdout) + return profiles.RenderTapOutputProfile(cmd.Context(), k8sAPI, options.tap, options.namespace, options.name, clusterDomain, options.tapDuration, int(options.tapRouteLimit), os.Stdout) } else if options.proto != "" { return profiles.RenderProto(options.proto, options.namespace, options.name, clusterDomain, os.Stdout) } diff --git a/cli/cmd/tap.go b/cli/cmd/tap.go index fc28b8ea7..0195a03c1 100644 --- a/cli/cmd/tap.go +++ b/cli/cmd/tap.go @@ -2,6 +2,7 @@ package cmd import ( "bufio" + "context" "encoding/json" "fmt" "io" @@ -199,7 +200,7 @@ func newCmdTap() *cobra.Command { return err } - return requestTapByResourceFromAPI(os.Stdout, k8sAPI, req, options) + return requestTapByResourceFromAPI(cmd.Context(), os.Stdout, k8sAPI, req, options) }, } @@ -227,8 +228,8 @@ func newCmdTap() *cobra.Command { return cmd } -func requestTapByResourceFromAPI(w io.Writer, k8sAPI *k8s.KubernetesAPI, req *pb.TapByResourceRequest, options *tapOptions) error { - reader, body, err := tap.Reader(k8sAPI, req, 0) +func requestTapByResourceFromAPI(ctx context.Context, w io.Writer, k8sAPI *k8s.KubernetesAPI, req *pb.TapByResourceRequest, options *tapOptions) error { + reader, body, err := tap.Reader(ctx, k8sAPI, req) if err != nil { return err } diff --git a/cli/cmd/tap_test.go b/cli/cmd/tap_test.go index fbc0fc6fe..ada59bb7b 100644 --- a/cli/cmd/tap_test.go +++ b/cli/cmd/tap_test.go @@ -2,6 +2,7 @@ package cmd import ( "bytes" + "context" "io/ioutil" "net/http" "net/http/httptest" @@ -131,7 +132,7 @@ func busyTest(t *testing.T, output string) { options.output = output writer := bytes.NewBufferString("") - err = requestTapByResourceFromAPI(writer, kubeAPI, req, options) + err = requestTapByResourceFromAPI(context.Background(), writer, kubeAPI, req, options) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -158,6 +159,8 @@ func busyTest(t *testing.T, output string) { } func TestRequestTapByResourceFromAPI(t *testing.T) { + + ctx := context.Background() t.Run("Should render busy response if everything went well", func(t *testing.T) { busyTest(t, "") }) @@ -197,7 +200,7 @@ func TestRequestTapByResourceFromAPI(t *testing.T) { options := newTapOptions() writer := bytes.NewBufferString("") - err = requestTapByResourceFromAPI(writer, kubeAPI, req, options) + err = requestTapByResourceFromAPI(ctx, writer, kubeAPI, req, options) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -236,7 +239,7 @@ func TestRequestTapByResourceFromAPI(t *testing.T) { options := newTapOptions() writer := bytes.NewBufferString("") - err = requestTapByResourceFromAPI(writer, kubeAPI, req, options) + err = requestTapByResourceFromAPI(ctx, writer, kubeAPI, req, options) if err == nil { t.Fatalf("Expecting error, got nothing but output [%s]", writer.String()) } diff --git a/cli/cmd/top.go b/cli/cmd/top.go index 2378caec5..75b7e2345 100644 --- a/cli/cmd/top.go +++ b/cli/cmd/top.go @@ -2,6 +2,7 @@ package cmd import ( "bufio" + "context" "fmt" "io" "sort" @@ -355,7 +356,7 @@ func newCmdTop() *cobra.Command { return err } - return getTrafficByResourceFromAPI(k8sAPI, req, table) + return getTrafficByResourceFromAPI(cmd.Context(), k8sAPI, req, table) }, } @@ -382,8 +383,8 @@ func newCmdTop() *cobra.Command { return cmd } -func getTrafficByResourceFromAPI(k8sAPI *k8s.KubernetesAPI, req *pb.TapByResourceRequest, table *topTable) error { - reader, body, err := tap.Reader(k8sAPI, req, 0) +func getTrafficByResourceFromAPI(ctx context.Context, k8sAPI *k8s.KubernetesAPI, req *pb.TapByResourceRequest, table *topTable) error { + reader, body, err := tap.Reader(ctx, k8sAPI, req) if err != nil { return err } diff --git a/pkg/profiles/tap.go b/pkg/profiles/tap.go index bddf60c6f..0d43d611e 100644 --- a/pkg/profiles/tap.go +++ b/pkg/profiles/tap.go @@ -2,6 +2,7 @@ package profiles import ( "bufio" + "context" "fmt" "io" "os" @@ -23,7 +24,7 @@ import ( // RenderTapOutputProfile performs a tap on the desired resource and generates // a service profile with routes pre-populated from the tap data // Only inbound tap traffic is considered. -func RenderTapOutputProfile(k8sAPI *k8s.KubernetesAPI, tapResource, namespace, name, clusterDomain string, tapDuration time.Duration, routeLimit int, w io.Writer) error { +func RenderTapOutputProfile(ctx context.Context, k8sAPI *k8s.KubernetesAPI, tapResource, namespace, name, clusterDomain string, tapDuration time.Duration, routeLimit int, w io.Writer) error { requestParams := util.TapRequestParams{ Resource: tapResource, Namespace: namespace, @@ -35,7 +36,7 @@ func RenderTapOutputProfile(k8sAPI *k8s.KubernetesAPI, tapResource, namespace, n return err } - profile, err := tapToServiceProfile(k8sAPI, req, namespace, name, clusterDomain, tapDuration, routeLimit) + profile, err := tapToServiceProfile(ctx, k8sAPI, req, namespace, name, clusterDomain, tapDuration, routeLimit) if err != nil { return err } @@ -48,7 +49,7 @@ func RenderTapOutputProfile(k8sAPI *k8s.KubernetesAPI, tapResource, namespace, n return nil } -func tapToServiceProfile(k8sAPI *k8s.KubernetesAPI, tapReq *pb.TapByResourceRequest, namespace, name, clusterDomain string, tapDuration time.Duration, routeLimit int) (sp.ServiceProfile, error) { +func tapToServiceProfile(ctx context.Context, k8sAPI *k8s.KubernetesAPI, tapReq *pb.TapByResourceRequest, namespace, name, clusterDomain string, tapDuration time.Duration, routeLimit int) (sp.ServiceProfile, error) { profile := sp.ServiceProfile{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s.%s.svc.%s", name, namespace, clusterDomain), @@ -57,7 +58,9 @@ func tapToServiceProfile(k8sAPI *k8s.KubernetesAPI, tapReq *pb.TapByResourceRequ TypeMeta: serviceProfileMeta, } - reader, body, err := tap.Reader(k8sAPI, tapReq, tapDuration) + ctxWithTime, cancel := context.WithTimeout(ctx, tapDuration) + defer cancel() + reader, body, err := tap.Reader(ctxWithTime, k8sAPI, tapReq) if err != nil { return profile, err } @@ -81,7 +84,8 @@ func routeSpecFromTap(tapByteStream *bufio.Reader, routeLimit int) []*sp.RouteSp if err != nil { // expected errors when hitting the tapDuration deadline if err != io.EOF && - !strings.HasSuffix(err.Error(), "(Client.Timeout exceeded while reading body)") && + !strings.HasSuffix(err.Error(), "(Client.Timeout or context cancellation while reading body)") && + !strings.HasSuffix(err.Error(), "context deadline exceeded") && !strings.HasSuffix(err.Error(), "http2: response body closed") { fmt.Fprintln(os.Stderr, err) } diff --git a/pkg/profiles/tap_test.go b/pkg/profiles/tap_test.go index 742f91664..4bb286ff0 100644 --- a/pkg/profiles/tap_test.go +++ b/pkg/profiles/tap_test.go @@ -1,6 +1,7 @@ package profiles import ( + "context" "net/http" "net/http/httptest" "testing" @@ -119,7 +120,7 @@ func TestTapToServiceProfile(t *testing.T) { }, } - actualServiceProfile, err := tapToServiceProfile(kubeAPI, tapReq, namespace, name, clusterDomain, tapDuration, routeLimit) + actualServiceProfile, err := tapToServiceProfile(context.Background(), kubeAPI, tapReq, namespace, name, clusterDomain, tapDuration, routeLimit) if err != nil { t.Fatalf("Failed to create ServiceProfile: %v", err) } diff --git a/pkg/tap/tap.go b/pkg/tap/tap.go index 80d6b903b..1c1a78617 100644 --- a/pkg/tap/tap.go +++ b/pkg/tap/tap.go @@ -3,11 +3,11 @@ package tap import ( "bufio" "bytes" + "context" "fmt" "io" "net/http" "net/url" - "time" "github.com/golang/protobuf/proto" pb "github.com/linkerd/linkerd2/controller/gen/public" @@ -22,12 +22,11 @@ const TapRbacURL = "https://linkerd.io/tap-rbac" // Reader initiates a TapByResourceRequest and returns a buffered Reader. // It is the caller's responsibility to call Close() on the io.ReadCloser. -func Reader(k8sAPI *k8s.KubernetesAPI, req *pb.TapByResourceRequest, timeout time.Duration) (*bufio.Reader, io.ReadCloser, error) { +func Reader(ctx context.Context, k8sAPI *k8s.KubernetesAPI, req *pb.TapByResourceRequest) (*bufio.Reader, io.ReadCloser, error) { client, err := k8sAPI.NewClient() if err != nil { return nil, nil, err } - client.Timeout = timeout reqBytes, err := proto.Marshal(req) if err != nil { @@ -49,7 +48,7 @@ func Reader(k8sAPI *k8s.KubernetesAPI, req *pb.TapByResourceRequest, timeout tim return nil, nil, err } - httpRsp, err := client.Do(httpReq) + httpRsp, err := client.Do(httpReq.WithContext(ctx)) if err != nil { log.Debugf("Error invoking [%s]: %v", url, err) return nil, nil, err diff --git a/web/srv/api_handlers.go b/web/srv/api_handlers.go index 53d1d002d..13be10b44 100644 --- a/web/srv/api_handlers.go +++ b/web/srv/api_handlers.go @@ -269,7 +269,7 @@ func (h *handler) handleAPITap(w http.ResponseWriter, req *http.Request, p httpr } go func() { - reader, body, err := tap.Reader(h.k8sAPI, tapReq, 0) + reader, body, err := tap.Reader(req.Context(), h.k8sAPI, tapReq) if err != nil { // If there was a [403] error when initiating a tap, close the // socket with `ClosePolicyViolation` status code so that the error