Move all healthcheck-related code to pkg/healthcheck (#1492)

* Move all healthcheck-related code to pkg/healthcheck
* Fix failed check formatting
* Better version check wording

Signed-off-by: Kevin Lingerfelt <kl@buoyant.io>
This commit is contained in:
Kevin Lingerfelt 2018-08-20 16:50:22 -07:00 committed by GitHub
parent b8434d60d4
commit e97be1f5da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 559 additions and 752 deletions

View File

@ -1,26 +1,18 @@
package cmd
import (
"errors"
"fmt"
"io"
"os"
"github.com/linkerd/linkerd2/controller/api/public"
healthcheckPb "github.com/linkerd/linkerd2/controller/gen/common/healthcheck"
pb "github.com/linkerd/linkerd2/controller/gen/public"
"github.com/linkerd/linkerd2/pkg/healthcheck"
"github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/version"
"github.com/spf13/cobra"
)
const (
lineWidth = 80
okStatus = "[ok]"
failStatus = "[FAIL]"
errorStatus = "[ERROR]"
versionCheckURL = "https://versioncheck.linkerd.io/version.json"
lineWidth = 80
okStatus = "[ok]"
failStatus = "[FAIL]"
)
type checkOptions struct {
@ -44,33 +36,7 @@ local system, the Linkerd control plane, and connectivity between those. The pro
problems were found.`,
Args: cobra.NoArgs,
Run: func(cmd *cobra.Command, args []string) {
kubeApi, err := k8s.NewAPI(kubeconfigPath)
if err != nil {
fmt.Fprintf(os.Stderr, "Error with Kubernetes API: %s\n", err.Error())
statusCheckResultWasError(os.Stdout)
os.Exit(2)
}
var apiClient pb.ApiClient
if apiAddr != "" {
apiClient, err = public.NewInternalClient(controlPlaneNamespace, apiAddr)
} else {
apiClient, err = public.NewExternalClient(controlPlaneNamespace, kubeApi)
}
if err != nil {
fmt.Fprintf(os.Stderr, "Error with Linkerd API: %s\n", err.Error())
statusCheckResultWasError(os.Stdout)
os.Exit(2)
}
grpcStatusChecker := healthcheck.NewGrpcStatusChecker(apiClient)
versionStatusChecker := version.NewVersionStatusChecker(versionCheckURL, options.versionOverride, apiClient)
err = checkStatus(os.Stdout, kubeApi, grpcStatusChecker, versionStatusChecker)
if err != nil {
os.Exit(2)
}
configureAndRunChecks(options)
},
}
@ -80,9 +46,27 @@ problems were found.`,
return cmd
}
func checkStatus(w io.Writer, checkers ...healthcheck.StatusChecker) error {
prettyPrintResults := func(result *healthcheckPb.CheckResult) {
checkLabel := fmt.Sprintf("%s: %s", result.SubsystemName, result.CheckDescription)
func configureAndRunChecks(options *checkOptions) {
hc := healthcheck.NewHealthChecker()
hc.AddKubernetesAPIChecks(kubeconfigPath)
hc.AddLinkerdAPIChecks(apiAddr, controlPlaneNamespace)
hc.AddLinkerdVersionChecks(options.versionOverride)
success := runChecks(os.Stdout, hc)
fmt.Println("")
if !success {
fmt.Printf("Status check results are %s\n", failStatus)
os.Exit(2)
}
fmt.Printf("Status check results are %s\n", okStatus)
}
func runChecks(w io.Writer, hc *healthcheck.HealthChecker) bool {
prettyPrintResults := func(category, description string, err error) {
checkLabel := fmt.Sprintf("%s: %s", category, description)
filler := ""
lineBreak := "\n"
@ -90,49 +74,13 @@ func checkStatus(w io.Writer, checkers ...healthcheck.StatusChecker) error {
filler = filler + "."
}
switch result.Status {
case healthcheckPb.CheckStatus_OK:
fmt.Fprintf(w, "%s%s%s%s", checkLabel, filler, okStatus, lineBreak)
case healthcheckPb.CheckStatus_FAIL:
fmt.Fprintf(w, "%s%s%s -- %s%s", checkLabel, filler, failStatus, result.FriendlyMessageToUser, lineBreak)
case healthcheckPb.CheckStatus_ERROR:
fmt.Fprintf(w, "%s%s%s -- %s%s", checkLabel, filler, errorStatus, result.FriendlyMessageToUser, lineBreak)
if err != nil {
fmt.Fprintf(w, "%s%s%s -- %s%s", checkLabel, filler, failStatus, err.Error(), lineBreak)
return
}
fmt.Fprintf(w, "%s%s%s%s", checkLabel, filler, okStatus, lineBreak)
}
checker := healthcheck.MakeHealthChecker()
for _, c := range checkers {
checker.Add(c)
}
checkStatus := checker.PerformCheck(prettyPrintResults)
fmt.Fprintln(w, "")
var err error
switch checkStatus {
case healthcheckPb.CheckStatus_OK:
err = statusCheckResultWasOk(w)
case healthcheckPb.CheckStatus_FAIL:
err = statusCheckResultWasFail(w)
case healthcheckPb.CheckStatus_ERROR:
err = statusCheckResultWasError(w)
}
return err
}
func statusCheckResultWasOk(w io.Writer) error {
fmt.Fprintln(w, "Status check results are [ok]")
return nil
}
func statusCheckResultWasFail(w io.Writer) error {
fmt.Fprintln(w, "Status check results are [FAIL]")
return errors.New("failed status check")
}
func statusCheckResultWasError(w io.Writer) error {
fmt.Fprintln(w, "Status check results are [ERROR]")
return errors.New("error during status check")
return hc.RunChecks(prettyPrintResults)
}

View File

@ -2,41 +2,27 @@ package cmd
import (
"bytes"
"fmt"
"io/ioutil"
"testing"
healthcheckPb "github.com/linkerd/linkerd2/controller/gen/common/healthcheck"
"github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/healthcheck"
)
func TestCheckStatus(t *testing.T) {
t.Run("Prints expected output", func(t *testing.T) {
kubeApi := &k8s.MockKubeApi{}
kubeApi.SelfCheckResultsToReturn = []*healthcheckPb.CheckResult{
{
SubsystemName: k8s.KubeapiSubsystemName,
CheckDescription: k8s.KubeapiClientCheckDescription,
Status: healthcheckPb.CheckStatus_FAIL,
FriendlyMessageToUser: "This should contain instructions for fail",
},
{
SubsystemName: k8s.KubeapiSubsystemName,
CheckDescription: k8s.KubeapiAccessCheckDescription,
Status: healthcheckPb.CheckStatus_OK,
FriendlyMessageToUser: "This shouldn't be printed",
},
{
SubsystemName: k8s.KubeapiSubsystemName,
CheckDescription: k8s.KubeapiVersionCheckDescription,
Status: healthcheckPb.CheckStatus_ERROR,
FriendlyMessageToUser: "This should contain instructions for err",
},
}
hc := healthcheck.NewHealthChecker()
hc.Add("category", "check1", func() error {
return nil
})
hc.Add("category", "check2", func() error {
return fmt.Errorf("This should contain instructions for fail")
})
output := bytes.NewBufferString("")
checkStatus(output, kubeApi)
runChecks(output, hc)
goldenFileBytes, err := ioutil.ReadFile("testdata/status_busy_output.golden")
goldenFileBytes, err := ioutil.ReadFile("testdata/check_output.golden")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

View File

@ -1,17 +1,14 @@
package cmd
import (
"context"
"fmt"
"os"
"regexp"
"strings"
"time"
"github.com/linkerd/linkerd2/controller/api/public"
healthcheckPb "github.com/linkerd/linkerd2/controller/gen/common/healthcheck"
pb "github.com/linkerd/linkerd2/controller/gen/public"
"github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/healthcheck"
"github.com/linkerd/linkerd2/pkg/version"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
@ -74,69 +71,33 @@ func init() {
// checks to determine if the client can successfully connect to the API. If the
// checks fail, then CLI will print an error and exit.
func validatedPublicAPIClient() pb.ApiClient {
client, err := newPublicAPIClient()
if err != nil {
fmt.Fprintf(os.Stderr, "Cannot connect to Kubernetes: %s\n", err)
os.Exit(1)
}
hc := healthcheck.NewHealthChecker()
hc.AddKubernetesAPIChecks(kubeconfigPath)
hc.AddLinkerdAPIChecks(apiAddr, controlPlaneNamespace)
var selfCheckWithRetry func() error
selfCheckWithRetry = func() error {
res, err := client.SelfCheck(context.Background(), &healthcheckPb.SelfCheckRequest{})
exitOnError := func(category, description string, err error) {
if err != nil {
return err
}
for _, result := range res.Results {
if result.Status != healthcheckPb.CheckStatus_OK {
// If the control plane can't talk to Prometheus, that's likely a result
// of Prometheus not passing its readiness check yet on startup. In that
// case, print a waiting message and retry.
if result.SubsystemName == public.PromClientSubsystemName {
fmt.Fprintln(os.Stderr, "Waiting for controller to connect to Prometheus")
return selfCheckWithRetry()
}
return fmt.Errorf(result.FriendlyMessageToUser)
var msg string
switch category {
case healthcheck.KubernetesAPICategory:
msg = "Cannot connect to Kubernetes"
case healthcheck.LinkerdAPICategory:
msg = "Cannot connect to Linkerd"
}
}
fmt.Fprintf(os.Stderr, "%s: %s\n", msg, err)
return nil
}
checkCmd := "linkerd check"
if controlPlaneNamespace != defaultNamespace {
checkCmd += fmt.Sprintf(" --linkerd-namespace %s", controlPlaneNamespace)
}
fmt.Fprintf(os.Stderr, "Validate the install with: %s\n", checkCmd)
if err := selfCheckWithRetry(); err != nil {
fmt.Fprintf(os.Stderr, "Cannot connect to Linkerd: %s\n", err)
checkCmd := "linkerd check"
if controlPlaneNamespace != defaultNamespace {
checkCmd += fmt.Sprintf(" --linkerd-namespace %s", controlPlaneNamespace)
}
fmt.Fprintf(os.Stderr, "Validate the install with: %s\n", checkCmd)
os.Exit(1)
}
return client
}
// newPublicAPIClient executes status checks to determine if we can connect
// to Kubernetes, and if so returns a new public API client. Otherwise it
// returns an error.
func newPublicAPIClient() (pb.ApiClient, error) {
if apiAddr != "" {
return public.NewInternalClient(controlPlaneNamespace, apiAddr)
}
kubeAPI, err := k8s.NewAPI(kubeconfigPath)
if err != nil {
return nil, err
}
for _, result := range kubeAPI.SelfCheck() {
if result.Status != healthcheckPb.CheckStatus_OK {
return nil, fmt.Errorf(result.FriendlyMessageToUser)
os.Exit(1)
}
}
return public.NewExternalClient(controlPlaneNamespace, kubeAPI)
hc.RunChecks(exitOnError)
return hc.PublicAPIClient()
}
type proxyConfigOptions struct {

2
cli/cmd/testdata/check_output.golden vendored Normal file
View File

@ -0,0 +1,2 @@
category: check1...........................................................[ok]
category: check2...........................................................[FAIL] -- This should contain instructions for fail

View File

@ -1,5 +0,0 @@
kubernetes-api: can initialize the client..................................[FAIL] -- This should contain instructions for fail
kubernetes-api: can query the Kubernetes API...............................[ok]
kubernetes-api: is running the minimum Kubernetes API version..............[ERROR] -- This should contain instructions for err
Status check results are [ERROR]

View File

@ -45,23 +45,9 @@ func (c *grpcOverHttpClient) Version(ctx context.Context, req *pb.Empty, _ ...gr
}
func (c *grpcOverHttpClient) SelfCheck(ctx context.Context, req *healthcheckPb.SelfCheckRequest, _ ...grpc.CallOption) (*healthcheckPb.SelfCheckResponse, error) {
checkResponse := &healthcheckPb.SelfCheckResponse{
Results: []*healthcheckPb.CheckResult{c.checkIfNamespaceExists()},
}
// If the namespace does not exist, abort before making the SelfCheck API call
if checkResponse.Results[0].Status != healthcheckPb.CheckStatus_OK {
return checkResponse, nil
}
var msg healthcheckPb.SelfCheckResponse
err := c.apiRequest(ctx, "SelfCheck", req, &msg)
if err != nil {
return checkResponse, err
}
checkResponse.Results = append(checkResponse.Results, msg.Results...)
return checkResponse, nil
return &msg, err
}
func (c *grpcOverHttpClient) ListPods(ctx context.Context, req *pb.ListPodsRequest, _ ...grpc.CallOption) (*pb.ListPodsResponse, error) {
@ -143,39 +129,6 @@ func (c *grpcOverHttpClient) endpointNameToPublicApiUrl(endpoint string) *url.UR
return c.serverURL.ResolveReference(&url.URL{Path: endpoint})
}
func (c *grpcOverHttpClient) checkIfNamespaceExists() *healthcheckPb.CheckResult {
checkResult := &healthcheckPb.CheckResult{
SubsystemName: "namespace",
CheckDescription: "control plane namespace exists",
Status: healthcheckPb.CheckStatus_OK,
}
url := *c.serverURL
url.Path = fmt.Sprintf("/api/v1/namespaces/%s", c.controlPlaneNamespace)
log.Debugf("Making GET request to [%s]", url.String())
rsp, err := c.httpClient.Get(url.String())
if err != nil {
checkResult.Status = healthcheckPb.CheckStatus_ERROR
checkResult.FriendlyMessageToUser = fmt.Sprintf("Error calling the Kubernetes API: %s", err)
return checkResult
}
if rsp.StatusCode == http.StatusNotFound {
checkResult.Status = healthcheckPb.CheckStatus_FAIL
checkResult.FriendlyMessageToUser = fmt.Sprintf("The \"%s\" namespace does not exist", c.controlPlaneNamespace)
return checkResult
}
if rsp.StatusCode != http.StatusOK {
checkResult.Status = healthcheckPb.CheckStatus_ERROR
checkResult.FriendlyMessageToUser = fmt.Sprintf("Unexpected Kubernetes API response: %s", rsp.Status)
return checkResult
}
return checkResult
}
type tapClient struct {
ctx context.Context
reader *bufio.Reader
@ -225,8 +178,8 @@ func newClient(apiURL *url.URL, httpClientToUse *http.Client, controlPlaneNamesp
}, nil
}
func NewInternalClient(controlPlaneNamespace string, kubernetesApiHost string) (pb.ApiClient, error) {
apiURL, err := url.Parse(fmt.Sprintf("http://%s/", kubernetesApiHost))
func NewInternalClient(controlPlaneNamespace string, kubeAPIHost string) (pb.ApiClient, error) {
apiURL, err := url.Parse(fmt.Sprintf("http://%s/", kubeAPIHost))
if err != nil {
return nil, err
}
@ -234,13 +187,13 @@ func NewInternalClient(controlPlaneNamespace string, kubernetesApiHost string) (
return newClient(apiURL, http.DefaultClient, controlPlaneNamespace)
}
func NewExternalClient(controlPlaneNamespace string, kubeApi k8s.KubernetesApi) (pb.ApiClient, error) {
apiURL, err := kubeApi.UrlFor(controlPlaneNamespace, "/services/http:api:http/proxy/")
func NewExternalClient(controlPlaneNamespace string, kubeAPI *k8s.KubernetesAPI) (pb.ApiClient, error) {
apiURL, err := kubeAPI.UrlFor(controlPlaneNamespace, "/services/http:api:http/proxy/")
if err != nil {
return nil, err
}
httpClientToUse, err := kubeApi.NewClient()
httpClientToUse, err := kubeAPI.NewClient()
if err != nil {
return nil, err
}

View File

@ -11,7 +11,6 @@ import (
"testing"
"github.com/golang/protobuf/proto"
healthcheckPb "github.com/linkerd/linkerd2/controller/gen/common/healthcheck"
pb "github.com/linkerd/linkerd2/controller/gen/public"
)
@ -154,54 +153,6 @@ func TestFromByteStreamToProtocolBuffers(t *testing.T) {
})
}
func TestSelfCheck(t *testing.T) {
t.Run("Returns failed status check if namespace does not exist", func(t *testing.T) {
mockTransport := &mockTransport{}
mockTransport.responseToReturn = &http.Response{
StatusCode: 404,
}
mockHttpClient := &http.Client{
Transport: mockTransport,
}
apiURL := &url.URL{
Scheme: "http",
Host: "some-hostname",
Path: "/",
}
client, err := newClient(apiURL, mockHttpClient, "testnamespace")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
rsp, err := client.SelfCheck(context.Background(), &healthcheckPb.SelfCheckRequest{})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
expectedUrlRequested := "http://some-hostname/api/v1/namespaces/testnamespace"
actualUrlRequested := mockTransport.requestSent.URL.String()
if actualUrlRequested != expectedUrlRequested {
t.Fatalf("Expected request to URL [%v], but got [%v]", expectedUrlRequested, actualUrlRequested)
}
if len(rsp.Results) != 1 {
t.Fatalf("Expected one check result, got %v", rsp.Results)
}
expectedCheckResult := &healthcheckPb.CheckResult{
SubsystemName: "namespace",
CheckDescription: "control plane namespace exists",
Status: healthcheckPb.CheckStatus_FAIL,
FriendlyMessageToUser: "The \"testnamespace\" namespace does not exist",
}
if !proto.Equal(rsp.Results[0], expectedCheckResult) {
t.Fatalf("Expected check result to be [%v], but got [%v]", expectedCheckResult, rsp.Results[0])
}
})
}
func bufferedReader(t *testing.T, msg proto.Message) *bufio.Reader {
msgBytes, err := proto.Marshal(msg)
if err != nil {

View File

@ -1,50 +0,0 @@
package healthcheck
import (
"context"
"fmt"
"time"
healthcheckPb "github.com/linkerd/linkerd2/controller/gen/common/healthcheck"
"google.golang.org/grpc"
)
const grpcApiSubsystemName = "linkerd-api"
type grpcStatusChecker interface {
SelfCheck(ctx context.Context, in *healthcheckPb.SelfCheckRequest, opts ...grpc.CallOption) (*healthcheckPb.SelfCheckResponse, error)
}
type statusCheckerProxy struct {
delegate grpcStatusChecker
}
func (proxy *statusCheckerProxy) SelfCheck() []*healthcheckPb.CheckResult {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
selfCheckResponse, err := proxy.delegate.SelfCheck(ctx, &healthcheckPb.SelfCheckRequest{})
if err != nil {
return []*healthcheckPb.CheckResult{
&healthcheckPb.CheckResult{
SubsystemName: grpcApiSubsystemName,
CheckDescription: "can query the Linkerd API",
Status: healthcheckPb.CheckStatus_ERROR,
FriendlyMessageToUser: err.Error(),
},
}
}
for _, check := range selfCheckResponse.Results {
fullSubsystemName := fmt.Sprintf("%s[%s]", grpcApiSubsystemName, check.SubsystemName)
check.SubsystemName = fullSubsystemName
}
return selfCheckResponse.Results
}
func NewGrpcStatusChecker(grpClient grpcStatusChecker) StatusChecker {
return &statusCheckerProxy{
delegate: grpClient,
}
}

View File

@ -1,46 +1,224 @@
package healthcheck
import (
"context"
"fmt"
"net/http"
"time"
"github.com/linkerd/linkerd2/controller/api/public"
healthcheckPb "github.com/linkerd/linkerd2/controller/gen/common/healthcheck"
pb "github.com/linkerd/linkerd2/controller/gen/public"
"github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/version"
k8sVersion "k8s.io/apimachinery/pkg/version"
)
type StatusChecker interface {
SelfCheck() []*healthcheckPb.CheckResult
const (
KubernetesAPICategory = "kubernetes-api"
LinkerdAPICategory = "linkerd-api"
LinkerdVersionCategory = "linkerd-version"
)
type checker struct {
category string
description string
fatal bool
check func() error
checkRPC func() (*healthcheckPb.SelfCheckResponse, error)
}
type CheckObserver func(result *healthcheckPb.CheckResult)
type checkObserver func(string, string, error)
type HealthChecker struct {
subsystemsToCheck []StatusChecker
checkers []*checker
kubeAPI *k8s.KubernetesAPI
httpClient *http.Client
kubeVersion *k8sVersion.Info
apiClient pb.ApiClient
latestVersion string
}
func (hC *HealthChecker) Add(subsystemChecker StatusChecker) {
hC.subsystemsToCheck = append(hC.subsystemsToCheck, subsystemChecker)
func NewHealthChecker() *HealthChecker {
return &HealthChecker{
checkers: make([]*checker, 0),
}
}
func (hC *HealthChecker) PerformCheck(observer CheckObserver) healthcheckPb.CheckStatus {
var overallStatus healthcheckPb.CheckStatus
// AddKubernetesAPIChecks adds a series of checks to validate that the caller is
// configured to interact with a working Kubernetes cluster and that the cluster
// meets the minimum version requirement.
func (hc *HealthChecker) AddKubernetesAPIChecks(kubeconfigPath string) {
hc.checkers = append(hc.checkers, &checker{
category: KubernetesAPICategory,
description: "can initialize the client",
fatal: true,
check: func() (err error) {
hc.kubeAPI, err = k8s.NewAPI(kubeconfigPath)
return
},
})
for _, checker := range hC.subsystemsToCheck {
for _, singleResult := range checker.SelfCheck() {
checkResultContainsError := singleResult.Status == healthcheckPb.CheckStatus_ERROR
shouldOverrideStatus := singleResult.Status == healthcheckPb.CheckStatus_FAIL && overallStatus == healthcheckPb.CheckStatus_OK
hc.checkers = append(hc.checkers, &checker{
category: KubernetesAPICategory,
description: "can query the Kubernetes API",
fatal: true,
check: func() (err error) {
hc.httpClient, err = hc.kubeAPI.NewClient()
if err != nil {
return
}
hc.kubeVersion, err = hc.kubeAPI.GetVersionInfo(hc.httpClient)
return
},
})
if checkResultContainsError || shouldOverrideStatus {
overallStatus = singleResult.Status
hc.checkers = append(hc.checkers, &checker{
category: KubernetesAPICategory,
description: "is running the minimum Kubernetes API version",
fatal: false,
check: func() error {
return hc.kubeAPI.CheckVersion(hc.kubeVersion)
},
})
}
// AddLinkerdAPIChecks adds a series of checks to validate that the control
// plane namespace exists and that it's successfully serving the public API.
// These checks are dependent on the output of AddKubernetesAPIChecks, so those
// checks must be added first.
func (hc *HealthChecker) AddLinkerdAPIChecks(apiAddr, controlPlaneNamespace string) {
hc.checkers = append(hc.checkers, &checker{
category: LinkerdAPICategory,
description: "control plane namespace exists",
fatal: true,
check: func() error {
return hc.kubeAPI.CheckNamespaceExists(hc.httpClient, controlPlaneNamespace)
},
})
hc.checkers = append(hc.checkers, &checker{
category: LinkerdAPICategory,
description: "can initialize the client",
fatal: true,
check: func() (err error) {
if apiAddr != "" {
hc.apiClient, err = public.NewInternalClient(controlPlaneNamespace, apiAddr)
} else {
hc.apiClient, err = public.NewExternalClient(controlPlaneNamespace, hc.kubeAPI)
}
return
},
})
hc.checkers = append(hc.checkers, &checker{
category: LinkerdAPICategory,
description: "can query the control plane API",
fatal: true,
checkRPC: func() (*healthcheckPb.SelfCheckResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return hc.apiClient.SelfCheck(ctx, &healthcheckPb.SelfCheckRequest{})
},
})
}
// AddLinkerdVersionChecks adds a series of checks to validate that the CLI and
// control plane are running the latest available version. These checks are
// dependent on the output of AddLinkerdAPIChecks, so those checks must be added
// first.
func (hc *HealthChecker) AddLinkerdVersionChecks(versionOverride string) {
hc.checkers = append(hc.checkers, &checker{
category: LinkerdVersionCategory,
description: "can determine the latest version",
fatal: true,
check: func() (err error) {
if versionOverride != "" {
hc.latestVersion = versionOverride
} else {
hc.latestVersion, err = version.GetLatestVersion()
}
return
},
})
hc.checkers = append(hc.checkers, &checker{
category: LinkerdVersionCategory,
description: "cli is up-to-date",
fatal: false,
check: func() error {
return version.CheckClientVersion(hc.latestVersion)
},
})
hc.checkers = append(hc.checkers, &checker{
category: LinkerdVersionCategory,
description: "control plane is up-to-date",
fatal: false,
check: func() error {
return version.CheckServerVersion(hc.apiClient, hc.latestVersion)
},
})
}
// Add adds an arbitrary checker. This should only be used for testing. For
// production code, add sets of checkers using the `Add*` functions above.
func (hc *HealthChecker) Add(category, description string, check func() error) {
hc.checkers = append(hc.checkers, &checker{
category: category,
description: description,
check: check,
})
}
// RunChecks runs all configured checkers, and passes the results of each
// check to the observer. If a check fails and is marked as fatal, then all
// remaining checks are skipped. If at least one check fails, RunChecks returns
// false; if all checks passed, RunChecks returns true.
func (hc *HealthChecker) RunChecks(observer checkObserver) bool {
success := true
for _, checker := range hc.checkers {
if checker.check != nil {
err := checker.check()
observer(checker.category, checker.description, err)
if err != nil {
success = false
if checker.fatal {
break
}
}
}
if checker.checkRPC != nil {
checkRsp, err := checker.checkRPC()
observer(checker.category, checker.description, err)
if err != nil {
success = false
if checker.fatal {
break
}
continue
}
if observer != nil {
observer(singleResult)
for _, check := range checkRsp.Results {
category := fmt.Sprintf("%s[%s]", checker.category, check.SubsystemName)
var err error
if check.Status != healthcheckPb.CheckStatus_OK {
success = false
err = fmt.Errorf(check.FriendlyMessageToUser)
}
observer(category, check.CheckDescription, err)
}
}
}
return overallStatus
return success
}
func MakeHealthChecker() *HealthChecker {
return &HealthChecker{
subsystemsToCheck: make([]StatusChecker, 0),
}
// PublicAPIClient returns a fully configured public API client. This client
// is only configured if the AddKubernetesAPIChecks, AddLinkerdAPIChecks, and
// RunChecks functions have already been called.
func (hc *HealthChecker) PublicAPIClient() pb.ApiClient {
return hc.apiClient
}

View File

@ -1,125 +1,206 @@
package healthcheck
import (
"context"
"fmt"
"reflect"
"testing"
"github.com/linkerd/linkerd2/controller/api/public"
healthcheckPb "github.com/linkerd/linkerd2/controller/gen/common/healthcheck"
)
type mockSubsystem struct {
checksToReturn []*healthcheckPb.CheckResult
}
func TestHealthChecker(t *testing.T) {
nullObserver := func(_, _ string, _ error) {}
func (m *mockSubsystem) SelfCheck() []*healthcheckPb.CheckResult {
return m.checksToReturn
}
func TestSelfChecker(t *testing.T) {
workingSubsystem1 := &mockSubsystem{
checksToReturn: []*healthcheckPb.CheckResult{
{SubsystemName: "w1", CheckDescription: "w1a", Status: healthcheckPb.CheckStatus_OK},
{SubsystemName: "w1", CheckDescription: "w1b", Status: healthcheckPb.CheckStatus_OK},
},
}
workingSubsystem2 := &mockSubsystem{
checksToReturn: []*healthcheckPb.CheckResult{
{SubsystemName: "w2", CheckDescription: "w2a", Status: healthcheckPb.CheckStatus_OK},
{SubsystemName: "w2", CheckDescription: "w2b", Status: healthcheckPb.CheckStatus_OK},
passingCheck1 := &checker{
category: "cat1",
description: "desc1",
check: func() error {
return nil
},
}
failingSubsystem1 := &mockSubsystem{
checksToReturn: []*healthcheckPb.CheckResult{
{SubsystemName: "f1", CheckDescription: "fa", Status: healthcheckPb.CheckStatus_OK},
{SubsystemName: "f1", CheckDescription: "fb", Status: healthcheckPb.CheckStatus_FAIL},
passingCheck2 := &checker{
category: "cat2",
description: "desc2",
check: func() error {
return nil
},
}
errorSubsystem1 := &mockSubsystem{
checksToReturn: []*healthcheckPb.CheckResult{
{SubsystemName: "e1", CheckDescription: "ea", Status: healthcheckPb.CheckStatus_ERROR},
{SubsystemName: "e1", CheckDescription: "eb", Status: healthcheckPb.CheckStatus_OK},
failingCheck := &checker{
category: "cat3",
description: "desc3",
check: func() error {
return fmt.Errorf("error")
},
}
passingRPCClient := public.MockApiClient{
SelfCheckResponseToReturn: &healthcheckPb.SelfCheckResponse{
Results: []*healthcheckPb.CheckResult{
&healthcheckPb.CheckResult{
SubsystemName: "rpc1",
CheckDescription: "rpc desc1",
Status: healthcheckPb.CheckStatus_OK,
},
},
},
}
passingRPCCheck := &checker{
category: "cat4",
description: "desc4",
checkRPC: func() (*healthcheckPb.SelfCheckResponse, error) {
return passingRPCClient.SelfCheck(context.Background(),
&healthcheckPb.SelfCheckRequest{})
},
}
failingRPCClient := public.MockApiClient{
SelfCheckResponseToReturn: &healthcheckPb.SelfCheckResponse{
Results: []*healthcheckPb.CheckResult{
&healthcheckPb.CheckResult{
SubsystemName: "rpc2",
CheckDescription: "rpc desc2",
Status: healthcheckPb.CheckStatus_FAIL,
FriendlyMessageToUser: "rpc error",
},
},
},
}
failingRPCCheck := &checker{
category: "cat5",
description: "desc5",
checkRPC: func() (*healthcheckPb.SelfCheckResponse, error) {
return failingRPCClient.SelfCheck(context.Background(),
&healthcheckPb.SelfCheckRequest{})
},
}
fatalCheck := &checker{
category: "cat6",
description: "desc6",
fatal: true,
check: func() error {
return fmt.Errorf("fatal")
},
}
t.Run("Notifies observer of all results", func(t *testing.T) {
healthChecker := MakeHealthChecker()
healthChecker.Add(workingSubsystem1)
healthChecker.Add(workingSubsystem2)
healthChecker.Add(failingSubsystem1)
observedResults := make([]*healthcheckPb.CheckResult, 0)
observer := func(r *healthcheckPb.CheckResult) {
observedResults = append(observedResults, r)
hc := HealthChecker{
checkers: []*checker{
passingCheck1,
passingCheck2,
failingCheck,
passingRPCCheck,
failingRPCCheck,
},
}
expectedResults := make([]*healthcheckPb.CheckResult, 0)
expectedResults = append(expectedResults, workingSubsystem1.checksToReturn...)
expectedResults = append(expectedResults, workingSubsystem2.checksToReturn...)
expectedResults = append(expectedResults, failingSubsystem1.checksToReturn...)
healthChecker.PerformCheck(observer)
observedLength := len(observedResults)
expectedLength := len(expectedResults)
if expectedLength != observedLength {
t.Fatalf("Expecting observed check to contain [%d] check, got [%d]", expectedLength, observedLength)
}
observedResultsSet := make(map[string]bool)
for _, result := range observedResults {
observedResultsSet[result.CheckDescription] = true
}
for _, expected := range expectedResults {
if !observedResultsSet[expected.CheckDescription] {
t.Fatalf("Expected observed results to contain [%v], but was: %v", expected,
reflect.ValueOf(observedResultsSet).MapKeys())
observedResults := make([]string, 0)
observer := func(category, description string, err error) {
res := fmt.Sprintf("%s %s", category, description)
if err != nil {
res += fmt.Sprintf(": %s", err)
}
observedResults = append(observedResults, res)
}
expectedResults := []string{
"cat1 desc1",
"cat2 desc2",
"cat3 desc3: error",
"cat4 desc4",
"cat4[rpc1] rpc desc1",
"cat5 desc5",
"cat5[rpc2] rpc desc2: rpc error",
}
hc.RunChecks(observer)
if !reflect.DeepEqual(observedResults, expectedResults) {
t.Fatalf("Expected results %v, but got %v", expectedResults, observedResults)
}
})
t.Run("Is successful if all checks were successful", func(t *testing.T) {
healthChecker := MakeHealthChecker()
hc := HealthChecker{
checkers: []*checker{
passingCheck1,
passingCheck2,
passingRPCCheck,
},
}
healthChecker.Add(workingSubsystem1)
healthChecker.Add(workingSubsystem2)
success := hc.RunChecks(nullObserver)
checkStatus := healthChecker.PerformCheck(nil)
if checkStatus != healthcheckPb.CheckStatus_OK {
t.Fatalf("Expecting check to be successful, but got [%s]", checkStatus)
if !success {
t.Fatalf("Expecting checks to be successful, but got [%t]", success)
}
})
t.Run("Is failure if even a single test failed and no errors", func(t *testing.T) {
healthChecker := MakeHealthChecker()
t.Run("Is not successful if one check fails", func(t *testing.T) {
hc := HealthChecker{
checkers: []*checker{
passingCheck1,
failingCheck,
passingCheck2,
},
}
healthChecker.Add(workingSubsystem1)
healthChecker.Add(failingSubsystem1)
healthChecker.Add(workingSubsystem2)
success := hc.RunChecks(nullObserver)
checkStatus := healthChecker.PerformCheck(nil)
if checkStatus != healthcheckPb.CheckStatus_FAIL {
t.Fatalf("Expecting check to be error, but got [%s]", checkStatus)
if success {
t.Fatalf("Expecting checks to not be successful, but got [%t]", success)
}
})
t.Run("Is error if even a single test errored", func(t *testing.T) {
healthChecker := MakeHealthChecker()
t.Run("Is not successful if one RPC check fails", func(t *testing.T) {
hc := HealthChecker{
checkers: []*checker{
passingCheck1,
failingRPCCheck,
passingCheck2,
},
}
healthChecker.Add(workingSubsystem1)
healthChecker.Add(failingSubsystem1)
healthChecker.Add(errorSubsystem1)
success := hc.RunChecks(nullObserver)
checkStatus := healthChecker.PerformCheck(nil)
if success {
t.Fatalf("Expecting checks to not be successful, but got [%t]", success)
}
})
if checkStatus != healthcheckPb.CheckStatus_ERROR {
t.Fatalf("Expecting check to be error, but got [%s]", checkStatus)
t.Run("Does not run remaining check if fatal check fails", func(t *testing.T) {
hc := HealthChecker{
checkers: []*checker{
passingCheck1,
fatalCheck,
passingCheck2,
},
}
observedResults := make([]string, 0)
observer := func(category, description string, err error) {
res := fmt.Sprintf("%s %s", category, description)
if err != nil {
res += fmt.Sprintf(": %s", err)
}
observedResults = append(observedResults, res)
}
expectedResults := []string{
"cat1 desc1",
"cat6 desc6: fatal",
}
hc.RunChecks(observer)
if !reflect.DeepEqual(observedResults, expectedResults) {
t.Fatalf("Expected results %v, but got %v", expectedResults, observedResults)
}
})
}

View File

@ -9,35 +9,20 @@ import (
"net/url"
"time"
healthcheckPb "github.com/linkerd/linkerd2/controller/gen/common/healthcheck"
"github.com/linkerd/linkerd2/pkg/healthcheck"
"k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/rest"
// Load all the auth plugins for the cloud providers.
_ "k8s.io/client-go/plugin/pkg/client/auth"
)
const (
KubeapiSubsystemName = "kubernetes-api"
KubeapiClientCheckDescription = "can initialize the client"
KubeapiAccessCheckDescription = "can query the Kubernetes API"
KubeapiVersionCheckDescription = "is running the minimum Kubernetes API version"
)
var minApiVersion = [3]int{1, 8, 0}
type KubernetesApi interface {
UrlFor(namespace string, extraPathStartingWithSlash string) (*url.URL, error)
NewClient() (*http.Client, error)
healthcheck.StatusChecker
}
type kubernetesApi struct {
type KubernetesAPI struct {
*rest.Config
}
func (kubeapi *kubernetesApi) NewClient() (*http.Client, error) {
secureTransport, err := rest.TransportFor(kubeapi.Config)
func (kubeAPI *KubernetesAPI) NewClient() (*http.Client, error) {
secureTransport, err := rest.TransportFor(kubeAPI.Config)
if err != nil {
return nil, fmt.Errorf("error instantiating Kubernetes API client: %v", err)
}
@ -47,127 +32,98 @@ func (kubeapi *kubernetesApi) NewClient() (*http.Client, error) {
}, nil
}
func (kubeapi *kubernetesApi) SelfCheck() (checks []*healthcheckPb.CheckResult) {
apiConnectivityCheck, client := kubeapi.checkApiConnectivity()
checks = append(checks, apiConnectivityCheck)
if apiConnectivityCheck.Status != healthcheckPb.CheckStatus_OK {
return
}
apiAccessCheck, versionRsp := kubeapi.checkApiAccess(client)
checks = append(checks, apiAccessCheck)
if apiAccessCheck.Status != healthcheckPb.CheckStatus_OK {
return
}
checks = append(checks, kubeapi.checkApiVersion(versionRsp))
return
}
func (kubeapi *kubernetesApi) checkApiConnectivity() (*healthcheckPb.CheckResult, *http.Client) {
checkResult := &healthcheckPb.CheckResult{
Status: healthcheckPb.CheckStatus_OK,
SubsystemName: KubeapiSubsystemName,
CheckDescription: KubeapiClientCheckDescription,
}
client, err := kubeapi.NewClient()
func (kubeAPI *KubernetesAPI) GetVersionInfo(client *http.Client) (*version.Info, error) {
endpoint, err := url.Parse(kubeAPI.Host + "/version")
if err != nil {
checkResult.Status = healthcheckPb.CheckStatus_ERROR
checkResult.FriendlyMessageToUser = fmt.Sprintf("Error connecting to the API. Error message is [%s]", err.Error())
return checkResult, client
return nil, err
}
return checkResult, client
}
func (kubeapi *kubernetesApi) checkApiAccess(client *http.Client) (*healthcheckPb.CheckResult, string) {
checkResult := &healthcheckPb.CheckResult{
Status: healthcheckPb.CheckStatus_OK,
SubsystemName: KubeapiSubsystemName,
CheckDescription: KubeapiAccessCheckDescription,
}
endpointToCheck, err := url.Parse(kubeapi.Host + "/version")
req, err := http.NewRequest("GET", endpoint.String(), nil)
if err != nil {
checkResult.Status = healthcheckPb.CheckStatus_ERROR
checkResult.FriendlyMessageToUser = fmt.Sprintf("Error configuring the Kubernetes API host %s: %s", kubeapi.Host, err)
return checkResult, ""
return nil, err
}
req, _ := http.NewRequest("GET", endpointToCheck.String(), nil)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := client.Do(req.WithContext(ctx))
rsp, err := client.Do(req.WithContext(ctx))
if err != nil {
checkResult.Status = healthcheckPb.CheckStatus_ERROR
checkResult.FriendlyMessageToUser = fmt.Sprintf("Error calling the Kubernetes API: %s", err)
return checkResult, ""
return nil, err
}
defer resp.Body.Close()
defer rsp.Body.Close()
bytes, err := ioutil.ReadAll(resp.Body)
if rsp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("Unexpected Kubernetes API response: %s", rsp.Status)
}
bytes, err := ioutil.ReadAll(rsp.Body)
if err != nil {
checkResult.Status = healthcheckPb.CheckStatus_ERROR
checkResult.FriendlyMessageToUser = fmt.Sprintf("Error reading Kubernetes API response body: %s", err)
return checkResult, ""
}
body := string(bytes)
if resp.StatusCode != http.StatusOK {
checkResult.Status = healthcheckPb.CheckStatus_FAIL
checkResult.FriendlyMessageToUser = fmt.Sprintf("Unexpected Kubernetes API response: %s, body: %s", resp.Status, body)
return checkResult, ""
}
return checkResult, body
}
func (kubeapi *kubernetesApi) checkApiVersion(versionRsp string) *healthcheckPb.CheckResult {
checkResult := &healthcheckPb.CheckResult{
Status: healthcheckPb.CheckStatus_OK,
SubsystemName: KubeapiSubsystemName,
CheckDescription: KubeapiVersionCheckDescription,
return nil, err
}
var versionInfo version.Info
err := json.Unmarshal([]byte(versionRsp), &versionInfo)
if err != nil {
checkResult.Status = healthcheckPb.CheckStatus_ERROR
checkResult.FriendlyMessageToUser = fmt.Sprintf("Version endpoint returned invalid JSON: [%v]", versionRsp)
return checkResult
}
err = json.Unmarshal(bytes, &versionInfo)
return &versionInfo, err
}
func (kubeAPI *KubernetesAPI) CheckVersion(versionInfo *version.Info) error {
apiVersion, err := getK8sVersion(versionInfo.String())
if err != nil {
checkResult.Status = healthcheckPb.CheckStatus_ERROR
checkResult.FriendlyMessageToUser = fmt.Sprintf("Failed to parse version [%s]: %s", versionInfo.String(), err)
return checkResult
return err
}
if !isCompatibleVersion(minApiVersion, apiVersion) {
checkResult.Status = healthcheckPb.CheckStatus_FAIL
checkResult.FriendlyMessageToUser = fmt.Sprintf("Kubernetes is on version [%d.%d.%d], but version [%d.%d.%d] or more recent is required.",
return fmt.Errorf("Kubernetes is on version [%d.%d.%d], but version [%d.%d.%d] or more recent is required",
apiVersion[0], apiVersion[1], apiVersion[2],
minApiVersion[0], minApiVersion[1], minApiVersion[2])
return checkResult
}
return checkResult
return nil
}
func (kubeAPI *KubernetesAPI) CheckNamespaceExists(client *http.Client, namespace string) error {
endpoint, err := url.Parse(kubeAPI.Host + "/api/v1/namespaces/" + namespace)
if err != nil {
return err
}
req, err := http.NewRequest("GET", endpoint.String(), nil)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
rsp, err := client.Do(req.WithContext(ctx))
if err != nil {
return err
}
defer rsp.Body.Close()
if rsp.StatusCode == http.StatusNotFound {
return fmt.Errorf("The \"%s\" namespace does not exist", namespace)
}
if rsp.StatusCode != http.StatusOK {
return fmt.Errorf("Unexpected Kubernetes API response: %s", rsp.Status)
}
return nil
}
// UrlFor generates a URL based on the Kubernetes config.
func (kubeapi *kubernetesApi) UrlFor(namespace string, extraPathStartingWithSlash string) (*url.URL, error) {
return generateKubernetesApiBaseUrlFor(kubeapi.Host, namespace, extraPathStartingWithSlash)
func (kubeAPI *KubernetesAPI) UrlFor(namespace string, extraPathStartingWithSlash string) (*url.URL, error) {
return generateKubernetesApiBaseUrlFor(kubeAPI.Host, namespace, extraPathStartingWithSlash)
}
// NewAPI returns a new KubernetesApi interface
func NewAPI(configPath string) (KubernetesApi, error) {
// NewAPI validates a Kubernetes config and returns a client for accessing the
// configured cluster
func NewAPI(configPath string) (*KubernetesAPI, error) {
config, err := getConfig(configPath)
if err != nil {
return nil, fmt.Errorf("error configuring Kubernetes API client: %v", err)
}
return &kubernetesApi{Config: config}, nil
return &KubernetesAPI{Config: config}, nil
}

View File

@ -1,31 +0,0 @@
package k8s
import (
"net/http"
"net/url"
healthcheckPb "github.com/linkerd/linkerd2/controller/gen/common/healthcheck"
)
type MockKubeApi struct {
SelfCheckResultsToReturn []*healthcheckPb.CheckResult
UrlForNamespaceReceived string
UrlExtraPathStartingWithSlashReceived string
UrlForUrlToReturn *url.URL
NewClientClientToReturn *http.Client
ErrorToReturn error
}
func (m *MockKubeApi) UrlFor(namespace string, extraPathStartingWithSlash string) (*url.URL, error) {
m.UrlForNamespaceReceived = namespace
m.UrlExtraPathStartingWithSlashReceived = extraPathStartingWithSlash
return m.UrlForUrlToReturn, m.ErrorToReturn
}
func (m *MockKubeApi) NewClient() (*http.Client, error) {
return m.NewClientClientToReturn, m.ErrorToReturn
}
func (m *MockKubeApi) SelfCheck() []*healthcheckPb.CheckResult {
return m.SelfCheckResultsToReturn
}

View File

@ -9,9 +9,7 @@ import (
"os"
"time"
healthcheckPb "github.com/linkerd/linkerd2/controller/gen/common/healthcheck"
pb "github.com/linkerd/linkerd2/controller/gen/public"
"github.com/linkerd/linkerd2/pkg/healthcheck"
)
// DO NOT EDIT
@ -19,10 +17,8 @@ import (
var Version = undefinedVersion
const (
undefinedVersion = "undefined"
VersionSubsystemName = "linkerd-version"
CliCheckDescription = "cli is up-to-date"
ControlPlaneCheckDescription = "control plane is up-to-date"
undefinedVersion = "undefined"
versionCheckURL = "https://versioncheck.linkerd.io/version.json"
)
func init() {
@ -39,104 +35,61 @@ func init() {
}
}
var httpClientTimeout = 10 * time.Second
func CheckClientVersion(latestVersion string) error {
if Version != latestVersion {
return fmt.Errorf("is running version %s but the latest version is %s",
Version, latestVersion)
}
type versionStatusChecker struct {
version string
versionCheckURL string
versionOverride string
publicApiClient pb.ApiClient
httpClient http.Client
return nil
}
func (v versionStatusChecker) SelfCheck() []*healthcheckPb.CheckResult {
cliVersion := v.version
cliIsUpToDate := &healthcheckPb.CheckResult{
Status: healthcheckPb.CheckStatus_OK,
SubsystemName: VersionSubsystemName,
CheckDescription: CliCheckDescription,
}
latestVersion, err := v.getLatestVersion()
if err != nil {
cliIsUpToDate.Status = healthcheckPb.CheckStatus_ERROR
cliIsUpToDate.FriendlyMessageToUser = err.Error()
return []*healthcheckPb.CheckResult{cliIsUpToDate}
}
if cliVersion != latestVersion {
cliIsUpToDate.Status = healthcheckPb.CheckStatus_FAIL
cliIsUpToDate.FriendlyMessageToUser = fmt.Sprintf("is running version %s but the latest version is %s", cliVersion, latestVersion)
}
controlPlaneIsUpToDate := &healthcheckPb.CheckResult{
Status: healthcheckPb.CheckStatus_OK,
SubsystemName: VersionSubsystemName,
CheckDescription: ControlPlaneCheckDescription,
}
controlPlaneVersion, err := v.getServerVersion()
if err != nil {
controlPlaneIsUpToDate.Status = healthcheckPb.CheckStatus_ERROR
controlPlaneIsUpToDate.FriendlyMessageToUser = err.Error()
return []*healthcheckPb.CheckResult{controlPlaneIsUpToDate}
}
if controlPlaneVersion != latestVersion {
controlPlaneIsUpToDate.Status = healthcheckPb.CheckStatus_FAIL
controlPlaneIsUpToDate.FriendlyMessageToUser = fmt.Sprintf("is running version %s but the latest version is %s", controlPlaneVersion, latestVersion)
}
checks := []*healthcheckPb.CheckResult{cliIsUpToDate}
checks = append(checks, controlPlaneIsUpToDate)
return checks
}
func (v versionStatusChecker) getServerVersion() (string, error) {
func CheckServerVersion(apiClient pb.ApiClient, latestVersion string) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := v.publicApiClient.Version(ctx, &pb.Empty{})
rsp, err := apiClient.Version(ctx, &pb.Empty{})
if err != nil {
return "", err
return err
}
return resp.GetReleaseVersion(), nil
if rsp.GetReleaseVersion() != latestVersion {
return fmt.Errorf("is running version %s but the latest version is %s",
rsp.GetReleaseVersion(), latestVersion)
}
return nil
}
func (v versionStatusChecker) getLatestVersion() (string, error) {
if v.versionOverride != "" {
return v.versionOverride, nil
}
resp, err := v.httpClient.Get(v.versionCheckURL)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return "", fmt.Errorf("got %d error from %s", resp.StatusCode, v.versionCheckURL)
}
bodyBytes, err := ioutil.ReadAll(resp.Body)
func GetLatestVersion() (string, error) {
req, err := http.NewRequest("GET", versionCheckURL, nil)
if err != nil {
return "", err
}
var l map[string]string
err = json.Unmarshal(bodyBytes, &l)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
rsp, err := http.DefaultClient.Do(req.WithContext(ctx))
if err != nil {
return "", err
}
defer rsp.Body.Close()
if rsp.StatusCode != 200 {
return "", fmt.Errorf("Unexpected versioncheck response: %s", rsp.Status)
}
bytes, err := ioutil.ReadAll(rsp.Body)
if err != nil {
return "", err
}
return l["version"], nil
}
func NewVersionStatusChecker(versionCheckURL, versionOverride string, client pb.ApiClient) healthcheck.StatusChecker {
return versionStatusChecker{
version: Version,
versionCheckURL: versionCheckURL,
versionOverride: versionOverride,
publicApiClient: client,
httpClient: http.Client{Timeout: httpClientTimeout},
}
var versionRsp map[string]string
err = json.Unmarshal(bytes, &versionRsp)
if err != nil {
return "", err
}
return versionRsp["version"], nil
}

View File

@ -1,122 +1,43 @@
package version_test
import (
"fmt"
"net/http"
"testing"
"time"
"github.com/linkerd/linkerd2/controller/api/public"
healthcheckPb "github.com/linkerd/linkerd2/controller/gen/common/healthcheck"
pb "github.com/linkerd/linkerd2/controller/gen/public"
"github.com/linkerd/linkerd2/pkg/version"
)
func TestVersionCheck(t *testing.T) {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "{\"version\": \"v0.3.0\"}")
})
go http.ListenAndServe("localhost:23456", nil)
// wait for HTTP server to initialize
readyCh := make(chan struct{})
go func() {
for {
_, err := http.Head("http://localhost:23456/")
if err == nil {
close(readyCh)
break
}
}
}()
select {
case <-readyCh:
case <-time.After(5 * time.Second):
t.Fatalf("Failed to initialize HTTP server")
}
t.Run("Passes when versions are latest", func(t *testing.T) {
version.Version = "v0.3.0"
mockPublicApi := createMockPublicApi("v0.3.0")
versionStatusChecker := version.NewVersionStatusChecker("http://localhost:23456/", "", mockPublicApi)
checks := versionStatusChecker.SelfCheck()
expectedName := version.VersionSubsystemName
if checks[0].SubsystemName != expectedName {
t.Fatalf("Expecting check name to be [%s], got [%s]", expectedName, checks[0].SubsystemName)
}
if checks[1].SubsystemName != expectedName {
t.Fatalf("Expecting check name to be [%s], got [%s]", expectedName, checks[0].SubsystemName)
}
expectedStatus := healthcheckPb.CheckStatus_OK
if checks[0].Status != expectedStatus {
t.Fatalf("Expecting cli check status to be [%d], got [%d]", expectedStatus, checks[0].Status)
}
if checks[1].Status != expectedStatus {
t.Fatalf("Expecting control plane check status to be [%d], got [%d]", expectedStatus, checks[1].Status)
}
expectedDescription := version.CliCheckDescription
if checks[0].CheckDescription != expectedDescription {
t.Fatalf("Expecting check description to be [%s], got [%s]", expectedDescription, checks[0].CheckDescription)
}
expectedDescription = version.ControlPlaneCheckDescription
if checks[1].CheckDescription != expectedDescription {
t.Fatalf("Expecting check description to be [%s], got [%s]", expectedDescription, checks[0].CheckDescription)
func TestCheckClientVersion(t *testing.T) {
t.Run("Passes when client version matches", func(t *testing.T) {
err := version.CheckClientVersion(version.Version)
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
})
t.Run("Fails when cli version is not latest", func(t *testing.T) {
version.Version = "v0.1.1"
mockPublicApi := createMockPublicApi("v0.3.0")
versionStatusChecker := version.NewVersionStatusChecker("http://localhost:23456/", "", mockPublicApi)
checks := versionStatusChecker.SelfCheck()
expectedStatus := healthcheckPb.CheckStatus_FAIL
if checks[0].Status != expectedStatus {
t.Fatalf("Expecting check status to be [%d], got [%d]", expectedStatus, checks[0].Status)
t.Run("Fails when client version does not match", func(t *testing.T) {
err := version.CheckClientVersion(version.Version + "latest")
if err == nil {
t.Fatalf("Expected error, got none")
}
})
}
expectedMessage := "is running version v0.1.1 but the latest version is v0.3.0"
if checks[0].FriendlyMessageToUser != expectedMessage {
t.Fatalf("Expecting message to be [%s], got [%s]", expectedMessage, checks[0].FriendlyMessageToUser)
func TestCheckServerVersion(t *testing.T) {
t.Run("Passes when server version matches", func(t *testing.T) {
apiClient := createMockPublicApi(version.Version)
err := version.CheckServerVersion(apiClient, version.Version)
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
})
t.Run("Fails when control plane version is not latest", func(t *testing.T) {
version.Version = "v0.3.0"
mockPublicApi := createMockPublicApi("v0.1.1")
versionStatusChecker := version.NewVersionStatusChecker("http://localhost:23456/", "", mockPublicApi)
checks := versionStatusChecker.SelfCheck()
expectedStatus := healthcheckPb.CheckStatus_FAIL
if checks[1].Status != expectedStatus {
t.Fatalf("Expecting check status to be [%d], got [%d]", expectedStatus, checks[1].Status)
}
expectedMessage := "is running version v0.1.1 but the latest version is v0.3.0"
if checks[1].FriendlyMessageToUser != expectedMessage {
t.Fatalf("Expecting message to be [%s], got [%s]", expectedMessage, checks[1].FriendlyMessageToUser)
}
})
t.Run("Supports overriding the expected version", func(t *testing.T) {
version.Version = "customversion"
mockPublicApi := createMockPublicApi("customversion")
versionStatusChecker := version.NewVersionStatusChecker("http://localhost:23456/", "customversion", mockPublicApi)
checks := versionStatusChecker.SelfCheck()
for _, check := range checks {
if check.Status != healthcheckPb.CheckStatus_OK {
t.Errorf("Expecting check for [%s] to be [%s], got [%s]",
check.CheckDescription, healthcheckPb.CheckStatus_OK, check.Status)
}
t.Run("Fails when server version does not match", func(t *testing.T) {
apiClient := createMockPublicApi(version.Version + "latest")
err := version.CheckServerVersion(apiClient, version.Version)
if err == nil {
t.Fatalf("Expected error, got none")
}
})
}

View File

@ -1,9 +1,12 @@
kubernetes-api: can initialize the client..................................[ok]
kubernetes-api: can query the Kubernetes API...............................[ok]
kubernetes-api: is running the minimum Kubernetes API version..............[ok]
linkerd-api[namespace]: control plane namespace exists.....................[ok]
linkerd-api: control plane namespace exists................................[ok]
linkerd-api: can initialize the client.....................................[ok]
linkerd-api: can query the control plane API...............................[ok]
linkerd-api[kubernetes]: control plane can talk to Kubernetes..............[ok]
linkerd-api[prometheus]: control plane can talk to Prometheus..............[ok]
linkerd-version: can determine the latest version..........................[ok]
linkerd-version: cli is up-to-date.........................................[ok]
linkerd-version: control plane is up-to-date...............................[ok]