mirror of https://github.com/linkerd/linkerd2.git
Better CLI error messages when control plane is unavailable (#1428)
Signed-off-by: Kevin Lingerfelt <kl@buoyant.io>
This commit is contained in:
parent
38c4b2937a
commit
00a0572098
|
@ -54,7 +54,7 @@ problems were found.`,
|
|||
|
||||
var apiClient pb.ApiClient
|
||||
if apiAddr != "" {
|
||||
apiClient, err = public.NewInternalClient(apiAddr)
|
||||
apiClient, err = public.NewInternalClient(controlPlaneNamespace, apiAddr)
|
||||
} else {
|
||||
apiClient, err = public.NewExternalClient(controlPlaneNamespace, kubeApi)
|
||||
}
|
||||
|
@ -64,7 +64,7 @@ problems were found.`,
|
|||
os.Exit(2)
|
||||
}
|
||||
|
||||
grpcStatusChecker := healthcheck.NewGrpcStatusChecker(public.ApiSubsystemName, apiClient)
|
||||
grpcStatusChecker := healthcheck.NewGrpcStatusChecker(apiClient)
|
||||
versionStatusChecker := version.NewVersionStatusChecker(versionCheckURL, options.versionOverride, apiClient)
|
||||
|
||||
err = checkStatus(os.Stdout, kubeApi, grpcStatusChecker, versionStatusChecker)
|
||||
|
|
|
@ -1,15 +1,11 @@
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
healthcheckPb "github.com/linkerd/linkerd2/controller/gen/common/healthcheck"
|
||||
pb "github.com/linkerd/linkerd2/controller/gen/public"
|
||||
"github.com/linkerd/linkerd2/pkg/k8s"
|
||||
"github.com/pkg/browser"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
|
@ -71,22 +67,8 @@ func newCmdDashboard() *cobra.Command {
|
|||
os.Exit(1)
|
||||
}
|
||||
|
||||
client, err := newPublicAPIClient()
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Failed to initialize Linkerd API client: %+v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
dashboardAvailable, err := isDashboardAvailable(client)
|
||||
if err != nil {
|
||||
log.Debugf("Error checking dashboard availability: %s", err)
|
||||
}
|
||||
|
||||
if err != nil || !dashboardAvailable {
|
||||
fmt.Fprintf(os.Stderr, "Linkerd is not running in the \"%s\" namespace\n", controlPlaneNamespace)
|
||||
fmt.Fprintf(os.Stderr, "Install with: linkerd install --linkerd-namespace %s | kubectl apply -f -\n", controlPlaneNamespace)
|
||||
os.Exit(1)
|
||||
}
|
||||
// ensure we can connect to the public API before starting the proxy
|
||||
validatedPublicAPIClient()
|
||||
|
||||
fmt.Printf("Linkerd dashboard available at:\n%s\n", url.String())
|
||||
fmt.Printf("Grafana dashboard available at:\n%s\n", grafanaUrl.String())
|
||||
|
@ -130,17 +112,3 @@ func newCmdDashboard() *cobra.Command {
|
|||
|
||||
return cmd
|
||||
}
|
||||
|
||||
func isDashboardAvailable(client pb.ApiClient) (bool, error) {
|
||||
res, err := client.SelfCheck(context.Background(), &healthcheckPb.SelfCheckRequest{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
for _, result := range res.Results {
|
||||
if result.Status != healthcheckPb.CheckStatus_OK {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
|
|
@ -1,73 +0,0 @@
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/linkerd/linkerd2/controller/api/public"
|
||||
healthcheckPb "github.com/linkerd/linkerd2/controller/gen/common/healthcheck"
|
||||
)
|
||||
|
||||
func TestDashboardAvailability(t *testing.T) {
|
||||
t.Run("Returns true if api client has responds with a list of Self Checks that are OK", func(t *testing.T) {
|
||||
|
||||
mockSelfCheckResponse := &healthcheckPb.SelfCheckResponse{
|
||||
Results: []*healthcheckPb.CheckResult{
|
||||
{
|
||||
SubsystemName: "TestSystem",
|
||||
Status: healthcheckPb.CheckStatus_OK,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
mockPublicApi := &public.MockApiClient{
|
||||
SelfCheckResponseToReturn: mockSelfCheckResponse,
|
||||
}
|
||||
|
||||
dashboardAvailable, err := isDashboardAvailable(mockPublicApi)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected to not receive an error but got: %+v\n", err)
|
||||
}
|
||||
|
||||
if !dashboardAvailable {
|
||||
t.Fatalf("Expected dashboard available to be true but got: %t", dashboardAvailable)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Returns false if public api client returns a list of Self Checks that have failed", func(t *testing.T) {
|
||||
mockSelfCheckResponse := &healthcheckPb.SelfCheckResponse{
|
||||
Results: []*healthcheckPb.CheckResult{
|
||||
{
|
||||
SubsystemName: "TestSystem",
|
||||
Status: healthcheckPb.CheckStatus_FAIL,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
mockPublicApi := &public.MockApiClient{
|
||||
SelfCheckResponseToReturn: mockSelfCheckResponse,
|
||||
}
|
||||
|
||||
dashboardAvailable, err := isDashboardAvailable(mockPublicApi)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected to not receive an error but got: %+v\n", err)
|
||||
}
|
||||
|
||||
if dashboardAvailable {
|
||||
t.Fatalf("Expected dashboard available to be false but got: %t", dashboardAvailable)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Return false when public API Self Check fails to make a request", func(t *testing.T) {
|
||||
mockPublicApi := &public.MockApiClient{
|
||||
ErrorToReturn: errors.New("expected"),
|
||||
}
|
||||
dashboardAvailable, err := isDashboardAvailable(mockPublicApi)
|
||||
if err == nil {
|
||||
t.Fatalf("Expected error to not be nil")
|
||||
}
|
||||
if dashboardAvailable {
|
||||
t.Fatalf("Expected dashboard available to return false but gotL %t", dashboardAvailable)
|
||||
}
|
||||
})
|
||||
}
|
|
@ -54,12 +54,8 @@ Only pod resources (aka pods, po) are supported.`,
|
|||
if err != nil || resourceType != k8s.Pod {
|
||||
return fmt.Errorf("invalid resource type %s, valid types: %s", friendlyName, k8s.Pod)
|
||||
}
|
||||
client, err := newPublicAPIClient()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
podNames, err := getPods(client, options)
|
||||
podNames, err := getPods(validatedPublicAPIClient(), options)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -1,12 +1,15 @@
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/linkerd/linkerd2/controller/api/public"
|
||||
healthcheckPb "github.com/linkerd/linkerd2/controller/gen/common/healthcheck"
|
||||
pb "github.com/linkerd/linkerd2/controller/gen/public"
|
||||
"github.com/linkerd/linkerd2/pkg/k8s"
|
||||
"github.com/linkerd/linkerd2/pkg/version"
|
||||
|
@ -14,6 +17,8 @@ import (
|
|||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
const defaultNamespace = "linkerd"
|
||||
|
||||
var controlPlaneNamespace string
|
||||
var apiAddr string // An empty value means "use the Kubernetes configuration"
|
||||
var kubeconfigPath string
|
||||
|
@ -48,7 +53,7 @@ var RootCmd = &cobra.Command{
|
|||
}
|
||||
|
||||
func init() {
|
||||
RootCmd.PersistentFlags().StringVarP(&controlPlaneNamespace, "linkerd-namespace", "l", "linkerd", "Namespace in which Linkerd is installed")
|
||||
RootCmd.PersistentFlags().StringVarP(&controlPlaneNamespace, "linkerd-namespace", "l", defaultNamespace, "Namespace in which Linkerd is installed")
|
||||
RootCmd.PersistentFlags().StringVar(&kubeconfigPath, "kubeconfig", "", "Path to the kubeconfig file to use for CLI requests")
|
||||
RootCmd.PersistentFlags().StringVar(&apiAddr, "api-addr", "", "Override kubeconfig and communicate directly with the control plane at host:port (mostly for testing)")
|
||||
RootCmd.PersistentFlags().BoolVar(&verbose, "verbose", false, "Turn on debug logging")
|
||||
|
@ -64,14 +69,72 @@ func init() {
|
|||
RootCmd.AddCommand(newCmdVersion())
|
||||
}
|
||||
|
||||
// validatedPublicAPIClient builds a new public API client and executes status
|
||||
// checks to determine if the client can successfully connect to the API. If the
|
||||
// checks fail, then CLI will print an error and exit.
|
||||
func validatedPublicAPIClient() pb.ApiClient {
|
||||
client, err := newPublicAPIClient()
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Cannot connect to Kubernetes: %s\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
var selfCheckWithRetry func() error
|
||||
selfCheckWithRetry = func() error {
|
||||
res, err := client.SelfCheck(context.Background(), &healthcheckPb.SelfCheckRequest{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, result := range res.Results {
|
||||
if result.Status != healthcheckPb.CheckStatus_OK {
|
||||
// If the control plane can't talk to Prometheus, that's likely a result
|
||||
// of Prometheus not passing its readiness check yet on startup. In that
|
||||
// case, print a waiting message and retry.
|
||||
if result.SubsystemName == public.PromClientSubsystemName {
|
||||
fmt.Fprintln(os.Stderr, "Waiting for controller to connect to Prometheus")
|
||||
return selfCheckWithRetry()
|
||||
}
|
||||
|
||||
return fmt.Errorf(result.FriendlyMessageToUser)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := selfCheckWithRetry(); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Cannot connect to Linkerd: %s\n", err)
|
||||
checkCmd := "linkerd check"
|
||||
if controlPlaneNamespace != defaultNamespace {
|
||||
checkCmd += fmt.Sprintf(" --linkerd-namespace %s", controlPlaneNamespace)
|
||||
}
|
||||
fmt.Fprintf(os.Stderr, "Validate the install with: %s\n", checkCmd)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
return client
|
||||
}
|
||||
|
||||
// newPublicAPIClient executes status checks to determine if we can connect
|
||||
// to Kubernetes, and if so returns a new public API client. Otherwise it
|
||||
// returns an error.
|
||||
func newPublicAPIClient() (pb.ApiClient, error) {
|
||||
if apiAddr != "" {
|
||||
return public.NewInternalClient(apiAddr)
|
||||
return public.NewInternalClient(controlPlaneNamespace, apiAddr)
|
||||
}
|
||||
|
||||
kubeAPI, err := k8s.NewAPI(kubeconfigPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, result := range kubeAPI.SelfCheck() {
|
||||
if result.Status != healthcheckPb.CheckStatus_OK {
|
||||
return nil, fmt.Errorf(result.FriendlyMessageToUser)
|
||||
}
|
||||
}
|
||||
|
||||
return public.NewExternalClient(controlPlaneNamespace, kubeAPI)
|
||||
}
|
||||
|
||||
|
|
|
@ -101,17 +101,12 @@ If no resource name is specified, displays stats about all resources of the spec
|
|||
Args: cobra.RangeArgs(1, 2),
|
||||
ValidArgs: util.ValidTargets,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
client, err := newPublicAPIClient()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating api client while making stats request: %v", err)
|
||||
}
|
||||
|
||||
req, err := buildStatSummaryRequest(args, options)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating metrics request while making stats request: %v", err)
|
||||
}
|
||||
|
||||
output, err := requestStatsFromAPI(client, req, options)
|
||||
output, err := requestStatsFromAPI(validatedPublicAPIClient(), req, options)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -90,12 +90,7 @@ func newCmdTap() *cobra.Command {
|
|||
return err
|
||||
}
|
||||
|
||||
client, err := newPublicAPIClient()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return requestTapByResourceFromAPI(os.Stdout, client, req)
|
||||
return requestTapByResourceFromAPI(os.Stdout, validatedPublicAPIClient(), req)
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
@ -5,7 +5,9 @@ import (
|
|||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/linkerd/linkerd2/controller/api/public"
|
||||
pb "github.com/linkerd/linkerd2/controller/gen/public"
|
||||
"github.com/linkerd/linkerd2/pkg/k8s"
|
||||
"github.com/linkerd/linkerd2/pkg/version"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
@ -38,7 +40,7 @@ func newCmdVersion() *cobra.Command {
|
|||
fmt.Printf("Client version: %s\n", clientVersion)
|
||||
}
|
||||
|
||||
client, err := newPublicAPIClient()
|
||||
client, err := newVersionClient()
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error connecting to server: %s\n", err)
|
||||
os.Exit(1)
|
||||
|
@ -70,3 +72,15 @@ func getServerVersion(client pb.ApiClient) string {
|
|||
|
||||
return resp.GetReleaseVersion()
|
||||
}
|
||||
|
||||
// This client does not do any validation
|
||||
func newVersionClient() (pb.ApiClient, error) {
|
||||
if apiAddr != "" {
|
||||
return public.NewInternalClient(controlPlaneNamespace, apiAddr)
|
||||
}
|
||||
kubeAPI, err := k8s.NewAPI(kubeconfigPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return public.NewExternalClient(controlPlaneNamespace, kubeAPI)
|
||||
}
|
||||
|
|
|
@ -20,15 +20,15 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
apiRoot = "/" // Must be absolute (with a leading slash).
|
||||
apiVersion = "v1"
|
||||
apiPrefix = "api/" + apiVersion + "/" // Must be relative (without a leading slash).
|
||||
ApiSubsystemName = "linkerd-api"
|
||||
apiRoot = "/" // Must be absolute (with a leading slash).
|
||||
apiVersion = "v1"
|
||||
apiPrefix = "api/" + apiVersion + "/" // Must be relative (without a leading slash).
|
||||
)
|
||||
|
||||
type grpcOverHttpClient struct {
|
||||
serverURL *url.URL
|
||||
httpClient *http.Client
|
||||
serverURL *url.URL
|
||||
httpClient *http.Client
|
||||
controlPlaneNamespace string
|
||||
}
|
||||
|
||||
// TODO: This will replace Stat, once implemented
|
||||
|
@ -45,9 +45,23 @@ func (c *grpcOverHttpClient) Version(ctx context.Context, req *pb.Empty, _ ...gr
|
|||
}
|
||||
|
||||
func (c *grpcOverHttpClient) SelfCheck(ctx context.Context, req *healthcheckPb.SelfCheckRequest, _ ...grpc.CallOption) (*healthcheckPb.SelfCheckResponse, error) {
|
||||
checkResponse := &healthcheckPb.SelfCheckResponse{
|
||||
Results: []*healthcheckPb.CheckResult{c.checkIfNamespaceExists()},
|
||||
}
|
||||
|
||||
// If the namespace does not exist, abort before making the SelfCheck API call
|
||||
if checkResponse.Results[0].Status != healthcheckPb.CheckStatus_OK {
|
||||
return checkResponse, nil
|
||||
}
|
||||
|
||||
var msg healthcheckPb.SelfCheckResponse
|
||||
err := c.apiRequest(ctx, "SelfCheck", req, &msg)
|
||||
return &msg, err
|
||||
if err != nil {
|
||||
return checkResponse, err
|
||||
}
|
||||
|
||||
checkResponse.Results = append(checkResponse.Results, msg.Results...)
|
||||
return checkResponse, nil
|
||||
}
|
||||
|
||||
func (c *grpcOverHttpClient) ListPods(ctx context.Context, req *pb.ListPodsRequest, _ ...grpc.CallOption) (*pb.ListPodsResponse, error) {
|
||||
|
@ -67,7 +81,7 @@ func (c *grpcOverHttpClient) TapByResource(ctx context.Context, req *pb.TapByRes
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if err = checkIfResponseHasErrorHeader(httpRsp); err != nil {
|
||||
if err := checkIfResponseHasError(httpRsp); err != nil {
|
||||
httpRsp.Body.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
@ -92,12 +106,7 @@ func (c *grpcOverHttpClient) apiRequest(ctx context.Context, endpoint string, re
|
|||
defer httpRsp.Body.Close()
|
||||
log.Debugf("gRPC-over-HTTP call returned status [%s] and content length [%d]", httpRsp.Status, httpRsp.ContentLength)
|
||||
|
||||
clientSideErrorStatusCode := httpRsp.StatusCode >= 400 && httpRsp.StatusCode <= 499
|
||||
if clientSideErrorStatusCode {
|
||||
return fmt.Errorf("POST to API endpoint [%s] returned HTTP status [%s]", url, httpRsp.Status)
|
||||
}
|
||||
|
||||
if err = checkIfResponseHasErrorHeader(httpRsp); err != nil {
|
||||
if err := checkIfResponseHasError(httpRsp); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -134,6 +143,39 @@ func (c *grpcOverHttpClient) endpointNameToPublicApiUrl(endpoint string) *url.UR
|
|||
return c.serverURL.ResolveReference(&url.URL{Path: endpoint})
|
||||
}
|
||||
|
||||
func (c *grpcOverHttpClient) checkIfNamespaceExists() *healthcheckPb.CheckResult {
|
||||
checkResult := &healthcheckPb.CheckResult{
|
||||
SubsystemName: "namespace",
|
||||
CheckDescription: "control plane namespace exists",
|
||||
Status: healthcheckPb.CheckStatus_OK,
|
||||
}
|
||||
|
||||
url := *c.serverURL
|
||||
url.Path = fmt.Sprintf("/api/v1/namespaces/%s", c.controlPlaneNamespace)
|
||||
log.Debugf("Making GET request to [%s]", url.String())
|
||||
|
||||
rsp, err := c.httpClient.Get(url.String())
|
||||
if err != nil {
|
||||
checkResult.Status = healthcheckPb.CheckStatus_ERROR
|
||||
checkResult.FriendlyMessageToUser = fmt.Sprintf("Error calling the Kubernetes API: %s", err)
|
||||
return checkResult
|
||||
}
|
||||
|
||||
if rsp.StatusCode == http.StatusNotFound {
|
||||
checkResult.Status = healthcheckPb.CheckStatus_FAIL
|
||||
checkResult.FriendlyMessageToUser = fmt.Sprintf("The \"%s\" namespace does not exist", c.controlPlaneNamespace)
|
||||
return checkResult
|
||||
}
|
||||
|
||||
if rsp.StatusCode != http.StatusOK {
|
||||
checkResult.Status = healthcheckPb.CheckStatus_ERROR
|
||||
checkResult.FriendlyMessageToUser = fmt.Sprintf("Unexpected Kubernetes API response: %s", rsp.Status)
|
||||
return checkResult
|
||||
}
|
||||
|
||||
return checkResult
|
||||
}
|
||||
|
||||
type tapClient struct {
|
||||
ctx context.Context
|
||||
reader *bufio.Reader
|
||||
|
@ -167,8 +209,7 @@ func fromByteStreamToProtocolBuffers(byteStreamContainingMessage *bufio.Reader,
|
|||
return nil
|
||||
}
|
||||
|
||||
func newClient(apiURL *url.URL, httpClientToUse *http.Client) (pb.ApiClient, error) {
|
||||
|
||||
func newClient(apiURL *url.URL, httpClientToUse *http.Client, controlPlaneNamespace string) (pb.ApiClient, error) {
|
||||
if !apiURL.IsAbs() {
|
||||
return nil, fmt.Errorf("server URL must be absolute, was [%s]", apiURL.String())
|
||||
}
|
||||
|
@ -178,18 +219,19 @@ func newClient(apiURL *url.URL, httpClientToUse *http.Client) (pb.ApiClient, err
|
|||
log.Debugf("Expecting API to be served over [%s]", serverUrl)
|
||||
|
||||
return &grpcOverHttpClient{
|
||||
serverURL: serverUrl,
|
||||
httpClient: httpClientToUse,
|
||||
serverURL: serverUrl,
|
||||
httpClient: httpClientToUse,
|
||||
controlPlaneNamespace: controlPlaneNamespace,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewInternalClient(kubernetesApiHost string) (pb.ApiClient, error) {
|
||||
func NewInternalClient(controlPlaneNamespace string, kubernetesApiHost string) (pb.ApiClient, error) {
|
||||
apiURL, err := url.Parse(fmt.Sprintf("http://%s/", kubernetesApiHost))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return newClient(apiURL, http.DefaultClient)
|
||||
return newClient(apiURL, http.DefaultClient, controlPlaneNamespace)
|
||||
}
|
||||
|
||||
func NewExternalClient(controlPlaneNamespace string, kubeApi k8s.KubernetesApi) (pb.ApiClient, error) {
|
||||
|
@ -203,5 +245,5 @@ func NewExternalClient(controlPlaneNamespace string, kubeApi k8s.KubernetesApi)
|
|||
return nil, err
|
||||
}
|
||||
|
||||
return newClient(apiURL, httpClientToUse)
|
||||
return newClient(apiURL, httpClientToUse, controlPlaneNamespace)
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
healthcheckPb "github.com/linkerd/linkerd2/controller/gen/common/healthcheck"
|
||||
pb "github.com/linkerd/linkerd2/controller/gen/public"
|
||||
)
|
||||
|
||||
|
@ -29,7 +30,7 @@ func TestNewInternalClient(t *testing.T) {
|
|||
t.Run("Makes a well-formed request over the Kubernetes public API", func(t *testing.T) {
|
||||
mockTransport := &mockTransport{}
|
||||
mockTransport.responseToReturn = &http.Response{
|
||||
StatusCode: 500,
|
||||
StatusCode: 200,
|
||||
Body: ioutil.NopCloser(bufferedReader(t, &pb.Empty{})),
|
||||
}
|
||||
mockHttpClient := &http.Client{
|
||||
|
@ -41,7 +42,7 @@ func TestNewInternalClient(t *testing.T) {
|
|||
Host: "some-hostname",
|
||||
Path: "/",
|
||||
}
|
||||
client, err := newClient(apiURL, mockHttpClient)
|
||||
client, err := newClient(apiURL, mockHttpClient, "linkerd")
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
@ -60,7 +61,6 @@ func TestNewInternalClient(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestFromByteStreamToProtocolBuffers(t *testing.T) {
|
||||
|
||||
t.Run("Correctly marshalls an valid object", func(t *testing.T) {
|
||||
versionInfo := pb.VersionInfo{
|
||||
GoVersion: "1.9.1",
|
||||
|
@ -154,6 +154,54 @@ func TestFromByteStreamToProtocolBuffers(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestSelfCheck(t *testing.T) {
|
||||
t.Run("Returns failed status check if namespace does not exist", func(t *testing.T) {
|
||||
mockTransport := &mockTransport{}
|
||||
mockTransport.responseToReturn = &http.Response{
|
||||
StatusCode: 404,
|
||||
}
|
||||
mockHttpClient := &http.Client{
|
||||
Transport: mockTransport,
|
||||
}
|
||||
|
||||
apiURL := &url.URL{
|
||||
Scheme: "http",
|
||||
Host: "some-hostname",
|
||||
Path: "/",
|
||||
}
|
||||
client, err := newClient(apiURL, mockHttpClient, "testnamespace")
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
rsp, err := client.SelfCheck(context.Background(), &healthcheckPb.SelfCheckRequest{})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
expectedUrlRequested := "http://some-hostname/api/v1/namespaces/testnamespace"
|
||||
actualUrlRequested := mockTransport.requestSent.URL.String()
|
||||
if actualUrlRequested != expectedUrlRequested {
|
||||
t.Fatalf("Expected request to URL [%v], but got [%v]", expectedUrlRequested, actualUrlRequested)
|
||||
}
|
||||
|
||||
if len(rsp.Results) != 1 {
|
||||
t.Fatalf("Expected one check result, got %v", rsp.Results)
|
||||
}
|
||||
|
||||
expectedCheckResult := &healthcheckPb.CheckResult{
|
||||
SubsystemName: "namespace",
|
||||
CheckDescription: "control plane namespace exists",
|
||||
Status: healthcheckPb.CheckStatus_FAIL,
|
||||
FriendlyMessageToUser: "The \"testnamespace\" namespace does not exist",
|
||||
}
|
||||
|
||||
if !proto.Equal(rsp.Results[0], expectedCheckResult) {
|
||||
t.Fatalf("Expected check result to be [%v], but got [%v]", expectedCheckResult, rsp.Results[0])
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func bufferedReader(t *testing.T, msg proto.Message) *bufio.Reader {
|
||||
msgBytes, err := proto.Marshal(msg)
|
||||
if err != nil {
|
||||
|
|
|
@ -179,7 +179,7 @@ func (s *grpcServer) SelfCheck(ctx context.Context, in *healthcheckPb.SelfCheckR
|
|||
_, err := s.k8sAPI.Pod().Lister().List(labels.Everything())
|
||||
if err != nil {
|
||||
k8sClientCheck.Status = healthcheckPb.CheckStatus_ERROR
|
||||
k8sClientCheck.FriendlyMessageToUser = fmt.Sprintf("Error talking to Kubernetes from control plane: %s", err.Error())
|
||||
k8sClientCheck.FriendlyMessageToUser = fmt.Sprintf("Error calling the Kubernetes API: %s", err)
|
||||
}
|
||||
|
||||
promClientCheck := &healthcheckPb.CheckResult{
|
||||
|
@ -190,7 +190,7 @@ func (s *grpcServer) SelfCheck(ctx context.Context, in *healthcheckPb.SelfCheckR
|
|||
_, err = s.queryProm(ctx, fmt.Sprintf(podQuery, ""))
|
||||
if err != nil {
|
||||
promClientCheck.Status = healthcheckPb.CheckStatus_ERROR
|
||||
promClientCheck.FriendlyMessageToUser = fmt.Sprintf("Error talking to Prometheus from control plane: %s", err.Error())
|
||||
promClientCheck.FriendlyMessageToUser = fmt.Sprintf("Error calling Prometheus from the control plane: %s", err)
|
||||
}
|
||||
|
||||
response := &healthcheckPb.SelfCheckResponse{
|
||||
|
|
|
@ -86,7 +86,7 @@ func TestServer(t *testing.T) {
|
|||
}
|
||||
}()
|
||||
|
||||
client, err := NewInternalClient(listener.Addr().String())
|
||||
client, err := NewInternalClient("linkerd", listener.Addr().String())
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
@ -118,20 +118,7 @@ func TestServer(t *testing.T) {
|
|||
functionCall: func() (proto.Message, error) { return client.Version(context.TODO(), versionReq) },
|
||||
}
|
||||
|
||||
selfCheckReq := &healcheckPb.SelfCheckRequest{}
|
||||
testSelfCheck := grpcCallTestCase{
|
||||
expectedRequest: selfCheckReq,
|
||||
expectedResponse: &healcheckPb.SelfCheckResponse{
|
||||
Results: []*healcheckPb.CheckResult{
|
||||
{
|
||||
SubsystemName: "banana",
|
||||
},
|
||||
},
|
||||
},
|
||||
functionCall: func() (proto.Message, error) { return client.SelfCheck(context.TODO(), selfCheckReq) },
|
||||
}
|
||||
|
||||
for _, testCase := range []grpcCallTestCase{testListPods, testStatSummary, testVersion, testSelfCheck} {
|
||||
for _, testCase := range []grpcCallTestCase{testListPods, testStatSummary, testVersion} {
|
||||
assertCallWasForwarded(t, mockGrpcServer, testCase.expectedRequest, testCase.expectedResponse, testCase.functionCall)
|
||||
}
|
||||
})
|
||||
|
@ -154,7 +141,7 @@ func TestServer(t *testing.T) {
|
|||
}
|
||||
}()
|
||||
|
||||
client, err := NewInternalClient(listener.Addr().String())
|
||||
client, err := NewInternalClient("linkerd", listener.Addr().String())
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
@ -214,7 +201,7 @@ func TestServer(t *testing.T) {
|
|||
}
|
||||
}()
|
||||
|
||||
client, err := NewInternalClient(listener.Addr().String())
|
||||
client, err := NewInternalClient("linkerd", listener.Addr().String())
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
|
|
@ -134,7 +134,7 @@ func deserializePayloadFromReader(reader *bufio.Reader) ([]byte, error) {
|
|||
return messageContentsAsBytes, nil
|
||||
}
|
||||
|
||||
func checkIfResponseHasErrorHeader(rsp *http.Response) error {
|
||||
func checkIfResponseHasError(rsp *http.Response) error {
|
||||
errorMsg := rsp.Header.Get(errorHeader)
|
||||
|
||||
if errorMsg != "" {
|
||||
|
@ -143,11 +143,15 @@ func checkIfResponseHasErrorHeader(rsp *http.Response) error {
|
|||
|
||||
err := fromByteStreamToProtocolBuffers(reader, &apiError)
|
||||
if err != nil {
|
||||
return fmt.Errorf("response has %s header [%s], but response body didn't contain protobuf error: %v", errorHeader, errorMsg, err)
|
||||
return fmt.Errorf("Response has %s header [%s], but response body didn't contain protobuf error: %v", errorHeader, errorMsg, err)
|
||||
}
|
||||
|
||||
return errors.New(apiError.Error)
|
||||
}
|
||||
|
||||
if rsp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("Unexpected API response: %s", rsp.Status)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -436,15 +436,15 @@ func TestNewStreamingWriter(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestCheckIfResponseHasError(t *testing.T) {
|
||||
t.Run("returns nil if response doesn't contain linkerd-error header", func(t *testing.T) {
|
||||
t.Run("returns nil if response doesn't contain linkerd-error header and is 200", func(t *testing.T) {
|
||||
response := &http.Response{
|
||||
Header: make(http.Header),
|
||||
Header: make(http.Header),
|
||||
StatusCode: http.StatusOK,
|
||||
}
|
||||
err := checkIfResponseHasErrorHeader(response)
|
||||
err := checkIfResponseHasError(response)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
t.Run("returns error in body if response contains linkerd-error header", func(t *testing.T) {
|
||||
|
@ -460,12 +460,13 @@ func TestCheckIfResponseHasError(t *testing.T) {
|
|||
}
|
||||
|
||||
response := &http.Response{
|
||||
Header: make(http.Header),
|
||||
Body: ioutil.NopCloser(bytes.NewReader(message)),
|
||||
Header: make(http.Header),
|
||||
Body: ioutil.NopCloser(bytes.NewReader(message)),
|
||||
StatusCode: http.StatusInternalServerError,
|
||||
}
|
||||
response.Header.Set(errorHeader, "error")
|
||||
|
||||
err = checkIfResponseHasErrorHeader(response)
|
||||
err = checkIfResponseHasError(response)
|
||||
if err == nil {
|
||||
t.Fatalf("Expecting error, got nothing")
|
||||
}
|
||||
|
@ -488,16 +489,35 @@ func TestCheckIfResponseHasError(t *testing.T) {
|
|||
}
|
||||
|
||||
response := &http.Response{
|
||||
Header: make(http.Header),
|
||||
Body: ioutil.NopCloser(bytes.NewReader(message)),
|
||||
Header: make(http.Header),
|
||||
Body: ioutil.NopCloser(bytes.NewReader(message)),
|
||||
StatusCode: http.StatusInternalServerError,
|
||||
}
|
||||
response.Header.Set(errorHeader, "error")
|
||||
|
||||
err = checkIfResponseHasErrorHeader(response)
|
||||
err = checkIfResponseHasError(response)
|
||||
if err == nil {
|
||||
t.Fatalf("Expecting error, got nothing")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("returns error if response is not a 200", func(t *testing.T) {
|
||||
response := &http.Response{
|
||||
StatusCode: http.StatusServiceUnavailable,
|
||||
Status: "503 Service Unavailable",
|
||||
}
|
||||
|
||||
err := checkIfResponseHasError(response)
|
||||
if err == nil {
|
||||
t.Fatalf("Expecting error, got nothing")
|
||||
}
|
||||
|
||||
expectedErrorMessage := "Unexpected API response: 503 Service Unavailable"
|
||||
actualErrorMessage := err.Error()
|
||||
if actualErrorMessage != expectedErrorMessage {
|
||||
t.Fatalf("Expected error message to be [%s], but it was [%s]", expectedErrorMessage, actualErrorMessage)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func assertResponseHasProtobufContentType(t *testing.T, responseWriter *stubResponseWriter) {
|
||||
|
|
|
@ -9,45 +9,42 @@ import (
|
|||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
const grpcApiSubsystemName = "linkerd-api"
|
||||
|
||||
type grpcStatusChecker interface {
|
||||
SelfCheck(ctx context.Context, in *healthcheckPb.SelfCheckRequest, opts ...grpc.CallOption) (*healthcheckPb.SelfCheckResponse, error)
|
||||
}
|
||||
|
||||
type statusCheckerProxy struct {
|
||||
delegate grpcStatusChecker
|
||||
prefix string
|
||||
}
|
||||
|
||||
func (proxy *statusCheckerProxy) SelfCheck() []*healthcheckPb.CheckResult {
|
||||
canConnectViaGrpcCheck := &healthcheckPb.CheckResult{
|
||||
Status: healthcheckPb.CheckStatus_OK,
|
||||
SubsystemName: proxy.prefix,
|
||||
CheckDescription: "can query the Linkerd API",
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
selfCheckResponse, err := proxy.delegate.SelfCheck(ctx, &healthcheckPb.SelfCheckRequest{})
|
||||
if err != nil {
|
||||
canConnectViaGrpcCheck.Status = healthcheckPb.CheckStatus_ERROR
|
||||
canConnectViaGrpcCheck.FriendlyMessageToUser = err.Error()
|
||||
return []*healthcheckPb.CheckResult{canConnectViaGrpcCheck}
|
||||
return []*healthcheckPb.CheckResult{
|
||||
&healthcheckPb.CheckResult{
|
||||
SubsystemName: grpcApiSubsystemName,
|
||||
CheckDescription: "can query the Linkerd API",
|
||||
Status: healthcheckPb.CheckStatus_ERROR,
|
||||
FriendlyMessageToUser: err.Error(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
for _, check := range selfCheckResponse.Results {
|
||||
fullSubsystemName := fmt.Sprintf("%s[%s]", proxy.prefix, check.SubsystemName)
|
||||
fullSubsystemName := fmt.Sprintf("%s[%s]", grpcApiSubsystemName, check.SubsystemName)
|
||||
check.SubsystemName = fullSubsystemName
|
||||
}
|
||||
|
||||
subsystemResults := []*healthcheckPb.CheckResult{canConnectViaGrpcCheck}
|
||||
subsystemResults = append(subsystemResults, selfCheckResponse.Results...)
|
||||
return subsystemResults
|
||||
return selfCheckResponse.Results
|
||||
}
|
||||
|
||||
func NewGrpcStatusChecker(name string, grpClient grpcStatusChecker) StatusChecker {
|
||||
func NewGrpcStatusChecker(grpClient grpcStatusChecker) StatusChecker {
|
||||
return &statusCheckerProxy{
|
||||
prefix: name,
|
||||
delegate: grpClient,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -91,7 +91,7 @@ func (kubeapi *kubernetesApi) checkApiAccess(client *http.Client) (*healthcheckP
|
|||
endpointToCheck, err := url.Parse(kubeapi.Host + "/version")
|
||||
if err != nil {
|
||||
checkResult.Status = healthcheckPb.CheckStatus_ERROR
|
||||
checkResult.FriendlyMessageToUser = fmt.Sprintf("Error querying Kubernetes API. Configured host is [%s], error message is [%s]", kubeapi.Host, err.Error())
|
||||
checkResult.FriendlyMessageToUser = fmt.Sprintf("Error configuring the Kubernetes API host %s: %s", kubeapi.Host, err)
|
||||
return checkResult, ""
|
||||
}
|
||||
|
||||
|
@ -102,7 +102,7 @@ func (kubeapi *kubernetesApi) checkApiAccess(client *http.Client) (*healthcheckP
|
|||
resp, err := client.Do(req.WithContext(ctx))
|
||||
if err != nil {
|
||||
checkResult.Status = healthcheckPb.CheckStatus_ERROR
|
||||
checkResult.FriendlyMessageToUser = fmt.Sprintf("HTTP GET request to endpoint [%s] resulted in error: [%s]", endpointToCheck, err.Error())
|
||||
checkResult.FriendlyMessageToUser = fmt.Sprintf("Error calling the Kubernetes API: %s", err)
|
||||
return checkResult, ""
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
@ -110,15 +110,14 @@ func (kubeapi *kubernetesApi) checkApiAccess(client *http.Client) (*healthcheckP
|
|||
bytes, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
checkResult.Status = healthcheckPb.CheckStatus_ERROR
|
||||
checkResult.FriendlyMessageToUser = fmt.Sprintf("HTTP GET request to endpoint [%s] resulted in invalid response: [%v]", endpointToCheck, resp)
|
||||
checkResult.FriendlyMessageToUser = fmt.Sprintf("Error reading Kubernetes API response body: %s", err)
|
||||
return checkResult, ""
|
||||
}
|
||||
body := string(bytes)
|
||||
|
||||
statusCodeReturnedIsWithinSuccessRange := resp.StatusCode < 400
|
||||
if !statusCodeReturnedIsWithinSuccessRange {
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
checkResult.Status = healthcheckPb.CheckStatus_FAIL
|
||||
checkResult.FriendlyMessageToUser = fmt.Sprintf("HTTP GET request to endpoint [%s] resulted in Status: [%s], body: [%s]", endpointToCheck, resp.Status, body)
|
||||
checkResult.FriendlyMessageToUser = fmt.Sprintf("Unexpected Kubernetes API response: %s, body: %s", resp.Status, body)
|
||||
return checkResult, ""
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
kubernetes-api: can initialize the client..................................[ok]
|
||||
kubernetes-api: can query the Kubernetes API...............................[ok]
|
||||
kubernetes-api: is running the minimum Kubernetes API version..............[ok]
|
||||
linkerd-api: can query the Linkerd API.....................................[ok]
|
||||
linkerd-api[namespace]: control plane namespace exists.....................[ok]
|
||||
linkerd-api[kubernetes]: control plane can talk to Kubernetes..............[ok]
|
||||
linkerd-api[prometheus]: control plane can talk to Prometheus..............[ok]
|
||||
linkerd-version: cli is up-to-date.........................................[ok]
|
||||
|
|
|
@ -32,7 +32,7 @@ func main() {
|
|||
if err != nil {
|
||||
log.Fatalf("failed to parse API server address: %s", *kubernetesApiHost)
|
||||
}
|
||||
client, err := public.NewInternalClient(*kubernetesApiHost)
|
||||
client, err := public.NewInternalClient(*controllerNamespace, *kubernetesApiHost)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to construct client for API server URL %s", *kubernetesApiHost)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue