mirror of https://github.com/linkerd/linkerd2.git
cli: fix and update timeout warnings in profile cmd (#5122)
Fixes #5121 * cli: skip emitting warnings in Profile Whenever the tapDuration gets completed, there is a warning occured which we do not emit. This looks like it has been changed in the latest versions of the dependency. * Use context.withDeadline instead of client.timeout The usage of `client.Timeout` is not working correctly causing `W1022 17:20:12.372780 19049 transport.go:260] Unable to cancel request for promhttp.RoundTripperFunc` to be emitted by the Kubernetes Client. This is fixed by using context.WithDeadline and passing that into the http Request. Signed-off-by: Tarun Pothulapati <tarunpothulapati@outlook.com>
This commit is contained in:
parent
b7c5bd07ae
commit
39e7f84773
|
@ -120,7 +120,7 @@ func newCmdProfile() *cobra.Command {
|
||||||
} else if options.openAPI != "" {
|
} else if options.openAPI != "" {
|
||||||
return profiles.RenderOpenAPI(options.openAPI, options.namespace, options.name, clusterDomain, os.Stdout)
|
return profiles.RenderOpenAPI(options.openAPI, options.namespace, options.name, clusterDomain, os.Stdout)
|
||||||
} else if options.tap != "" {
|
} else if options.tap != "" {
|
||||||
return profiles.RenderTapOutputProfile(k8sAPI, options.tap, options.namespace, options.name, clusterDomain, options.tapDuration, int(options.tapRouteLimit), os.Stdout)
|
return profiles.RenderTapOutputProfile(cmd.Context(), k8sAPI, options.tap, options.namespace, options.name, clusterDomain, options.tapDuration, int(options.tapRouteLimit), os.Stdout)
|
||||||
} else if options.proto != "" {
|
} else if options.proto != "" {
|
||||||
return profiles.RenderProto(options.proto, options.namespace, options.name, clusterDomain, os.Stdout)
|
return profiles.RenderProto(options.proto, options.namespace, options.name, clusterDomain, os.Stdout)
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
@ -199,7 +200,7 @@ func newCmdTap() *cobra.Command {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return requestTapByResourceFromAPI(os.Stdout, k8sAPI, req, options)
|
return requestTapByResourceFromAPI(cmd.Context(), os.Stdout, k8sAPI, req, options)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -227,8 +228,8 @@ func newCmdTap() *cobra.Command {
|
||||||
return cmd
|
return cmd
|
||||||
}
|
}
|
||||||
|
|
||||||
func requestTapByResourceFromAPI(w io.Writer, k8sAPI *k8s.KubernetesAPI, req *pb.TapByResourceRequest, options *tapOptions) error {
|
func requestTapByResourceFromAPI(ctx context.Context, w io.Writer, k8sAPI *k8s.KubernetesAPI, req *pb.TapByResourceRequest, options *tapOptions) error {
|
||||||
reader, body, err := tap.Reader(k8sAPI, req, 0)
|
reader, body, err := tap.Reader(ctx, k8sAPI, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
|
@ -131,7 +132,7 @@ func busyTest(t *testing.T, output string) {
|
||||||
options.output = output
|
options.output = output
|
||||||
|
|
||||||
writer := bytes.NewBufferString("")
|
writer := bytes.NewBufferString("")
|
||||||
err = requestTapByResourceFromAPI(writer, kubeAPI, req, options)
|
err = requestTapByResourceFromAPI(context.Background(), writer, kubeAPI, req, options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -158,6 +159,8 @@ func busyTest(t *testing.T, output string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRequestTapByResourceFromAPI(t *testing.T) {
|
func TestRequestTapByResourceFromAPI(t *testing.T) {
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
t.Run("Should render busy response if everything went well", func(t *testing.T) {
|
t.Run("Should render busy response if everything went well", func(t *testing.T) {
|
||||||
busyTest(t, "")
|
busyTest(t, "")
|
||||||
})
|
})
|
||||||
|
@ -197,7 +200,7 @@ func TestRequestTapByResourceFromAPI(t *testing.T) {
|
||||||
|
|
||||||
options := newTapOptions()
|
options := newTapOptions()
|
||||||
writer := bytes.NewBufferString("")
|
writer := bytes.NewBufferString("")
|
||||||
err = requestTapByResourceFromAPI(writer, kubeAPI, req, options)
|
err = requestTapByResourceFromAPI(ctx, writer, kubeAPI, req, options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -236,7 +239,7 @@ func TestRequestTapByResourceFromAPI(t *testing.T) {
|
||||||
|
|
||||||
options := newTapOptions()
|
options := newTapOptions()
|
||||||
writer := bytes.NewBufferString("")
|
writer := bytes.NewBufferString("")
|
||||||
err = requestTapByResourceFromAPI(writer, kubeAPI, req, options)
|
err = requestTapByResourceFromAPI(ctx, writer, kubeAPI, req, options)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Fatalf("Expecting error, got nothing but output [%s]", writer.String())
|
t.Fatalf("Expecting error, got nothing but output [%s]", writer.String())
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"sort"
|
"sort"
|
||||||
|
@ -355,7 +356,7 @@ func newCmdTop() *cobra.Command {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return getTrafficByResourceFromAPI(k8sAPI, req, table)
|
return getTrafficByResourceFromAPI(cmd.Context(), k8sAPI, req, table)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -382,8 +383,8 @@ func newCmdTop() *cobra.Command {
|
||||||
return cmd
|
return cmd
|
||||||
}
|
}
|
||||||
|
|
||||||
func getTrafficByResourceFromAPI(k8sAPI *k8s.KubernetesAPI, req *pb.TapByResourceRequest, table *topTable) error {
|
func getTrafficByResourceFromAPI(ctx context.Context, k8sAPI *k8s.KubernetesAPI, req *pb.TapByResourceRequest, table *topTable) error {
|
||||||
reader, body, err := tap.Reader(k8sAPI, req, 0)
|
reader, body, err := tap.Reader(ctx, k8sAPI, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ package profiles
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
|
@ -23,7 +24,7 @@ import (
|
||||||
// RenderTapOutputProfile performs a tap on the desired resource and generates
|
// RenderTapOutputProfile performs a tap on the desired resource and generates
|
||||||
// a service profile with routes pre-populated from the tap data
|
// a service profile with routes pre-populated from the tap data
|
||||||
// Only inbound tap traffic is considered.
|
// Only inbound tap traffic is considered.
|
||||||
func RenderTapOutputProfile(k8sAPI *k8s.KubernetesAPI, tapResource, namespace, name, clusterDomain string, tapDuration time.Duration, routeLimit int, w io.Writer) error {
|
func RenderTapOutputProfile(ctx context.Context, k8sAPI *k8s.KubernetesAPI, tapResource, namespace, name, clusterDomain string, tapDuration time.Duration, routeLimit int, w io.Writer) error {
|
||||||
requestParams := util.TapRequestParams{
|
requestParams := util.TapRequestParams{
|
||||||
Resource: tapResource,
|
Resource: tapResource,
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
|
@ -35,7 +36,7 @@ func RenderTapOutputProfile(k8sAPI *k8s.KubernetesAPI, tapResource, namespace, n
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
profile, err := tapToServiceProfile(k8sAPI, req, namespace, name, clusterDomain, tapDuration, routeLimit)
|
profile, err := tapToServiceProfile(ctx, k8sAPI, req, namespace, name, clusterDomain, tapDuration, routeLimit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -48,7 +49,7 @@ func RenderTapOutputProfile(k8sAPI *k8s.KubernetesAPI, tapResource, namespace, n
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func tapToServiceProfile(k8sAPI *k8s.KubernetesAPI, tapReq *pb.TapByResourceRequest, namespace, name, clusterDomain string, tapDuration time.Duration, routeLimit int) (sp.ServiceProfile, error) {
|
func tapToServiceProfile(ctx context.Context, k8sAPI *k8s.KubernetesAPI, tapReq *pb.TapByResourceRequest, namespace, name, clusterDomain string, tapDuration time.Duration, routeLimit int) (sp.ServiceProfile, error) {
|
||||||
profile := sp.ServiceProfile{
|
profile := sp.ServiceProfile{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Name: fmt.Sprintf("%s.%s.svc.%s", name, namespace, clusterDomain),
|
Name: fmt.Sprintf("%s.%s.svc.%s", name, namespace, clusterDomain),
|
||||||
|
@ -57,7 +58,9 @@ func tapToServiceProfile(k8sAPI *k8s.KubernetesAPI, tapReq *pb.TapByResourceRequ
|
||||||
TypeMeta: serviceProfileMeta,
|
TypeMeta: serviceProfileMeta,
|
||||||
}
|
}
|
||||||
|
|
||||||
reader, body, err := tap.Reader(k8sAPI, tapReq, tapDuration)
|
ctxWithTime, cancel := context.WithTimeout(ctx, tapDuration)
|
||||||
|
defer cancel()
|
||||||
|
reader, body, err := tap.Reader(ctxWithTime, k8sAPI, tapReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return profile, err
|
return profile, err
|
||||||
}
|
}
|
||||||
|
@ -81,7 +84,8 @@ func routeSpecFromTap(tapByteStream *bufio.Reader, routeLimit int) []*sp.RouteSp
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// expected errors when hitting the tapDuration deadline
|
// expected errors when hitting the tapDuration deadline
|
||||||
if err != io.EOF &&
|
if err != io.EOF &&
|
||||||
!strings.HasSuffix(err.Error(), "(Client.Timeout exceeded while reading body)") &&
|
!strings.HasSuffix(err.Error(), "(Client.Timeout or context cancellation while reading body)") &&
|
||||||
|
!strings.HasSuffix(err.Error(), "context deadline exceeded") &&
|
||||||
!strings.HasSuffix(err.Error(), "http2: response body closed") {
|
!strings.HasSuffix(err.Error(), "http2: response body closed") {
|
||||||
fmt.Fprintln(os.Stderr, err)
|
fmt.Fprintln(os.Stderr, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package profiles
|
package profiles
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -119,7 +120,7 @@ func TestTapToServiceProfile(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
actualServiceProfile, err := tapToServiceProfile(kubeAPI, tapReq, namespace, name, clusterDomain, tapDuration, routeLimit)
|
actualServiceProfile, err := tapToServiceProfile(context.Background(), kubeAPI, tapReq, namespace, name, clusterDomain, tapDuration, routeLimit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to create ServiceProfile: %v", err)
|
t.Fatalf("Failed to create ServiceProfile: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,11 +3,11 @@ package tap
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
pb "github.com/linkerd/linkerd2/controller/gen/public"
|
pb "github.com/linkerd/linkerd2/controller/gen/public"
|
||||||
|
@ -22,12 +22,11 @@ const TapRbacURL = "https://linkerd.io/tap-rbac"
|
||||||
|
|
||||||
// Reader initiates a TapByResourceRequest and returns a buffered Reader.
|
// Reader initiates a TapByResourceRequest and returns a buffered Reader.
|
||||||
// It is the caller's responsibility to call Close() on the io.ReadCloser.
|
// It is the caller's responsibility to call Close() on the io.ReadCloser.
|
||||||
func Reader(k8sAPI *k8s.KubernetesAPI, req *pb.TapByResourceRequest, timeout time.Duration) (*bufio.Reader, io.ReadCloser, error) {
|
func Reader(ctx context.Context, k8sAPI *k8s.KubernetesAPI, req *pb.TapByResourceRequest) (*bufio.Reader, io.ReadCloser, error) {
|
||||||
client, err := k8sAPI.NewClient()
|
client, err := k8sAPI.NewClient()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
client.Timeout = timeout
|
|
||||||
|
|
||||||
reqBytes, err := proto.Marshal(req)
|
reqBytes, err := proto.Marshal(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -49,7 +48,7 @@ func Reader(k8sAPI *k8s.KubernetesAPI, req *pb.TapByResourceRequest, timeout tim
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
httpRsp, err := client.Do(httpReq)
|
httpRsp, err := client.Do(httpReq.WithContext(ctx))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debugf("Error invoking [%s]: %v", url, err)
|
log.Debugf("Error invoking [%s]: %v", url, err)
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
|
|
|
@ -269,7 +269,7 @@ func (h *handler) handleAPITap(w http.ResponseWriter, req *http.Request, p httpr
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
reader, body, err := tap.Reader(h.k8sAPI, tapReq, 0)
|
reader, body, err := tap.Reader(req.Context(), h.k8sAPI, tapReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// If there was a [403] error when initiating a tap, close the
|
// If there was a [403] error when initiating a tap, close the
|
||||||
// socket with `ClosePolicyViolation` status code so that the error
|
// socket with `ClosePolicyViolation` status code so that the error
|
||||||
|
|
Loading…
Reference in New Issue