diff --git a/multicluster/charts/linkerd-multicluster/README.md b/multicluster/charts/linkerd-multicluster/README.md index 79d9e05d6..c854f2ab5 100644 --- a/multicluster/charts/linkerd-multicluster/README.md +++ b/multicluster/charts/linkerd-multicluster/README.md @@ -80,9 +80,11 @@ Kubernetes: `>=1.22.0-0` | gateway.nodeSelector | object | `{}` | Node selectors for the gateway pod | | gateway.pauseImage | string | `"gcr.io/google_containers/pause:3.2"` | The pause container to use | | gateway.port | int | `4143` | The port on which all the gateway will accept incoming traffic | +| gateway.probe.failureThreshold | int | `3` | Minimum consecutive failures for the probe to be considered failed | | gateway.probe.path | string | `"/ready"` | The path that will be used by remote clusters for determining whether the gateway is alive | | gateway.probe.port | int | `4191` | The port used for liveliness probing | | gateway.probe.seconds | int | `3` | The interval (in seconds) between liveness probes | +| gateway.probe.timeout | string | `"30s"` | Probe request timeout (in go's time.Duration format) | | gateway.replicas | int | `1` | Number of replicas for the gateway pod | | gateway.serviceAnnotations | object | `{}` | Annotations to add to the gateway service | | gateway.serviceExternalTrafficPolicy | string | `""` | Set externalTrafficPolicy on gateway service | diff --git a/multicluster/charts/linkerd-multicluster/templates/gateway.yaml b/multicluster/charts/linkerd-multicluster/templates/gateway.yaml index 66b8b0a08..963aac574 100644 --- a/multicluster/charts/linkerd-multicluster/templates/gateway.yaml +++ b/multicluster/charts/linkerd-multicluster/templates/gateway.yaml @@ -102,8 +102,10 @@ metadata: {{- with .Values.commonLabels }}{{ toYaml . | trim | nindent 4 }}{{- end }} annotations: mirror.linkerd.io/gateway-identity: {{.Values.gateway.name}}.{{.Release.Namespace}}.serviceaccount.identity.{{.Values.linkerdNamespace}}.{{.Values.identityTrustDomain}} + mirror.linkerd.io/probe-failure-threshold: "{{.Values.gateway.probe.failureThreshold}}" mirror.linkerd.io/probe-period: "{{.Values.gateway.probe.seconds}}" mirror.linkerd.io/probe-path: {{.Values.gateway.probe.path}} + mirror.linkerd.io/probe-timeout: "{{.Values.gateway.probe.timeout}}" mirror.linkerd.io/multicluster-gateway: "true" component: gateway {{ include "partials.annotations.created-by" . }} diff --git a/multicluster/charts/linkerd-multicluster/templates/link-crd.yaml b/multicluster/charts/linkerd-multicluster/templates/link-crd.yaml index 4ff9946da..945de8fa7 100644 --- a/multicluster/charts/linkerd-multicluster/templates/link-crd.yaml +++ b/multicluster/charts/linkerd-multicluster/templates/link-crd.yaml @@ -40,6 +40,10 @@ spec: description: Spec for gateway health probe type: object properties: + failureThreshold: + default: "3" + description: Minimum consecutive failures for the probe to be considered failed + type: string path: description: Path of remote gateway health endpoint type: string @@ -49,6 +53,11 @@ spec: port: description: Port of remote gateway health endpoint type: string + timeout: + default: 30s + description: Probe request timeout + format: duration + type: string selector: description: Kubernetes Label Selector type: object diff --git a/multicluster/charts/linkerd-multicluster/values.yaml b/multicluster/charts/linkerd-multicluster/values.yaml index 143316669..8c09f541b 100644 --- a/multicluster/charts/linkerd-multicluster/values.yaml +++ b/multicluster/charts/linkerd-multicluster/values.yaml @@ -12,6 +12,8 @@ gateway: # nodePort -- Set the gateway nodePort (for LoadBalancer or NodePort) to a specific value # nodePort: probe: + # -- Minimum consecutive failures for the probe to be considered failed + failureThreshold: 3 # -- The path that will be used by remote clusters for determining whether the # gateway is alive path: /ready @@ -21,6 +23,8 @@ gateway: # nodePort: # -- The interval (in seconds) between liveness probes seconds: 3 + # -- Probe request timeout (in go's time.Duration format) + timeout: 30s # -- Annotations to add to the gateway service serviceAnnotations: {} # -- Set externalTrafficPolicy on gateway service diff --git a/multicluster/cmd/testdata/install_default.golden b/multicluster/cmd/testdata/install_default.golden index 5c9237d2c..28a77de3e 100644 --- a/multicluster/cmd/testdata/install_default.golden +++ b/multicluster/cmd/testdata/install_default.golden @@ -68,8 +68,10 @@ metadata: linkerd.io/extension: multicluster annotations: mirror.linkerd.io/gateway-identity: linkerd-gateway.linkerd-multicluster.serviceaccount.identity.linkerd.cluster.local + mirror.linkerd.io/probe-failure-threshold: "3" mirror.linkerd.io/probe-period: "3" mirror.linkerd.io/probe-path: /ready + mirror.linkerd.io/probe-timeout: "30s" mirror.linkerd.io/multicluster-gateway: "true" component: gateway linkerd.io/created-by: linkerd/helm linkerdVersionValue @@ -277,6 +279,10 @@ spec: description: Spec for gateway health probe type: object properties: + failureThreshold: + default: "3" + description: Minimum consecutive failures for the probe to be considered failed + type: string path: description: Path of remote gateway health endpoint type: string @@ -286,6 +292,11 @@ spec: port: description: Port of remote gateway health endpoint type: string + timeout: + default: 30s + description: Probe request timeout + format: duration + type: string selector: description: Kubernetes Label Selector type: object diff --git a/multicluster/cmd/testdata/install_ha.golden b/multicluster/cmd/testdata/install_ha.golden index 939ab7a5d..a1025bbdd 100644 --- a/multicluster/cmd/testdata/install_ha.golden +++ b/multicluster/cmd/testdata/install_ha.golden @@ -106,8 +106,10 @@ metadata: linkerd.io/extension: multicluster annotations: mirror.linkerd.io/gateway-identity: linkerd-gateway.linkerd-multicluster.serviceaccount.identity.linkerd.cluster.local + mirror.linkerd.io/probe-failure-threshold: "3" mirror.linkerd.io/probe-period: "3" mirror.linkerd.io/probe-path: /ready + mirror.linkerd.io/probe-timeout: "30s" mirror.linkerd.io/multicluster-gateway: "true" component: gateway linkerd.io/created-by: linkerd/helm linkerdVersionValue @@ -349,6 +351,10 @@ spec: description: Spec for gateway health probe type: object properties: + failureThreshold: + default: "3" + description: Minimum consecutive failures for the probe to be considered failed + type: string path: description: Path of remote gateway health endpoint type: string @@ -358,6 +364,11 @@ spec: port: description: Port of remote gateway health endpoint type: string + timeout: + default: 30s + description: Probe request timeout + format: duration + type: string selector: description: Kubernetes Label Selector type: object diff --git a/multicluster/cmd/testdata/install_psp.golden b/multicluster/cmd/testdata/install_psp.golden index 74373d53d..1db3da2ff 100644 --- a/multicluster/cmd/testdata/install_psp.golden +++ b/multicluster/cmd/testdata/install_psp.golden @@ -68,8 +68,10 @@ metadata: linkerd.io/extension: multicluster annotations: mirror.linkerd.io/gateway-identity: linkerd-gateway.linkerd-multicluster.serviceaccount.identity.linkerd.cluster.local + mirror.linkerd.io/probe-failure-threshold: "3" mirror.linkerd.io/probe-period: "3" mirror.linkerd.io/probe-path: /ready + mirror.linkerd.io/probe-timeout: "30s" mirror.linkerd.io/multicluster-gateway: "true" component: gateway linkerd.io/created-by: linkerd/helm linkerdVersionValue @@ -311,6 +313,10 @@ spec: description: Spec for gateway health probe type: object properties: + failureThreshold: + default: "3" + description: Minimum consecutive failures for the probe to be considered failed + type: string path: description: Path of remote gateway health endpoint type: string @@ -320,6 +326,11 @@ spec: port: description: Port of remote gateway health endpoint type: string + timeout: + default: 30s + description: Probe request timeout + format: duration + type: string selector: description: Kubernetes Label Selector type: object diff --git a/multicluster/service-mirror/jittered_ticker.go b/multicluster/service-mirror/jittered_ticker.go index 880158b30..90f05a713 100644 --- a/multicluster/service-mirror/jittered_ticker.go +++ b/multicluster/service-mirror/jittered_ticker.go @@ -38,7 +38,7 @@ func NewTicker(minDuration time.Duration, maxJitter time.Duration) *Ticker { if maxJitter < 0 { log.WithField("jitter", minDuration).Panic("Negative jitter") } - c := make(chan time.Time, 1) + c := make(chan time.Time) ticker := &Ticker{ C: c, stop: make(chan bool), diff --git a/multicluster/service-mirror/probe_worker.go b/multicluster/service-mirror/probe_worker.go index 9e7de9ad1..33bc947f0 100644 --- a/multicluster/service-mirror/probe_worker.go +++ b/multicluster/service-mirror/probe_worker.go @@ -13,8 +13,6 @@ import ( logging "github.com/sirupsen/logrus" ) -const httpGatewayTimeoutMillis = 50000 - // ProbeWorker is responsible for monitoring gateways using a probe specification type ProbeWorker struct { localGatewayName string @@ -65,76 +63,81 @@ func (pw *ProbeWorker) Start() { } func (pw *ProbeWorker) run() { + successLabel := prometheus.Labels{probeSuccessfulLabel: "true"} + notSuccessLabel := prometheus.Labels{probeSuccessfulLabel: "false"} + probeTickerPeriod := pw.probeSpec.Period maxJitter := pw.probeSpec.Period / 10 // max jitter is 10% of period probeTicker := NewTicker(probeTickerPeriod, maxJitter) defer probeTicker.Stop() + var failures uint32 = 0 + probeLoop: for { select { case <-pw.stopCh: break probeLoop case <-probeTicker.C: - pw.doProbe() + start := time.Now() + if err := pw.doProbe(); err != nil { + pw.log.Warn(err) + failures++ + if failures < pw.probeSpec.FailureThreshold { + continue probeLoop + } + + pw.log.Warnf("Failure threshold (%d) reached - Marking as unhealthy", pw.probeSpec.FailureThreshold) + pw.metrics.alive.Set(0) + pw.metrics.probes.With(notSuccessLabel).Inc() + if pw.alive { + pw.alive = false + pw.Liveness <- false + } + } else { + end := time.Since(start) + failures = 0 + + pw.log.Debug("Gateway is healthy") + pw.metrics.alive.Set(1) + pw.metrics.latency.Set(float64(end.Milliseconds())) + pw.metrics.latencies.Observe(float64(end.Milliseconds())) + pw.metrics.probes.With(successLabel).Inc() + if !pw.alive { + pw.alive = true + pw.Liveness <- true + } + } } } } -func (pw *ProbeWorker) doProbe() { +func (pw *ProbeWorker) doProbe() error { pw.RLock() defer pw.RUnlock() - successLabel := prometheus.Labels{probeSuccessfulLabel: "true"} - notSuccessLabel := prometheus.Labels{probeSuccessfulLabel: "false"} - client := http.Client{ - Timeout: httpGatewayTimeoutMillis * time.Millisecond, + Timeout: pw.probeSpec.Timeout, } strPort := strconv.Itoa(int(pw.probeSpec.Port)) urlAddress := net.JoinHostPort(pw.localGatewayName, strPort) req, err := http.NewRequest("GET", fmt.Sprintf("http://%s%s", urlAddress, pw.probeSpec.Path), nil) if err != nil { - pw.log.Errorf("Could not create a GET request to gateway: %s", err) - return + return fmt.Errorf("could not create a GET request to gateway: %w", err) } - start := time.Now() resp, err := client.Do(req) - end := time.Since(start) if err != nil { - pw.log.Warnf("Problem connecting with gateway. Marking as unhealthy %s", err) - pw.metrics.alive.Set(0) - pw.metrics.probes.With(notSuccessLabel).Inc() - if pw.alive { - pw.alive = false - pw.Liveness <- false - } - return + return fmt.Errorf("problem connecting with gateway: %w", err) } if resp.StatusCode != 200 { - pw.log.Warnf("Gateway returned unexpected status %d. Marking as unhealthy", resp.StatusCode) - pw.metrics.alive.Set(0) - pw.metrics.probes.With(notSuccessLabel).Inc() - if pw.alive { - pw.alive = false - pw.Liveness <- false - } - } else { - pw.log.Debug("Gateway is healthy") - pw.metrics.alive.Set(1) - pw.metrics.latency.Set(float64(end.Milliseconds())) - pw.metrics.latencies.Observe(float64(end.Milliseconds())) - pw.metrics.probes.With(successLabel).Inc() - if !pw.alive { - pw.alive = true - pw.Liveness <- true - } + return fmt.Errorf("gateway returned unexpected status %d", resp.StatusCode) } if err := resp.Body.Close(); err != nil { pw.log.Warnf("Failed to close response body %s", err) } + return nil } diff --git a/multicluster/values/values.go b/multicluster/values/values.go index 81023c015..b7eb95504 100644 --- a/multicluster/values/values.go +++ b/multicluster/values/values.go @@ -62,10 +62,12 @@ type Gateway struct { // Probe contains all options for the Probe Service type Probe struct { - Path string `json:"path"` - Port uint32 `json:"port"` - NodePort uint32 `json:"nodePort"` - Seconds uint32 `json:"seconds"` + FailureThreshold uint32 `json:"failureThreshold"` + Path string `json:"path"` + Port uint32 `json:"port"` + NodePort uint32 `json:"nodePort"` + Seconds uint32 `json:"seconds"` + Timeout string `json:"timeout"` } // NewInstallValues returns a new instance of the Values type. diff --git a/pkg/k8s/labels.go b/pkg/k8s/labels.go index de17415a0..d4c0465ae 100644 --- a/pkg/k8s/labels.go +++ b/pkg/k8s/labels.go @@ -467,12 +467,18 @@ const ( // GatewayIdentity can be found on the remote gateway service GatewayIdentity = SvcMirrorPrefix + "/gateway-identity" + // GatewayProbeFailureThreshold is the minimum consecutive failures for the probe to be considered failed + GatewayProbeFailureThreshold = SvcMirrorPrefix + "/probe-failure-threshold" + // GatewayProbePeriod the interval at which the health of the gateway should be probed GatewayProbePeriod = SvcMirrorPrefix + "/probe-period" // GatewayProbePath the path at which the health of the gateway should be probed GatewayProbePath = SvcMirrorPrefix + "/probe-path" + // GatewayProbeTimeout is the probe request timeout + GatewayProbeTimeout = SvcMirrorPrefix + "/probe-timeout" + // ConfigKeyName is the key in the secret that stores the kubeconfig needed to connect // to a remote cluster ConfigKeyName = "kubeconfig" diff --git a/pkg/multicluster/link.go b/pkg/multicluster/link.go index 49b36846d..61a094342 100644 --- a/pkg/multicluster/link.go +++ b/pkg/multicluster/link.go @@ -17,14 +17,19 @@ import ( "k8s.io/client-go/dynamic" ) +const DefaultFailureThreshold = 3 +const DefaultProbeTimeout = "30s" + type ( // ProbeSpec defines how a gateway should be queried for health. Once per // period, the probe workers will send an HTTP request to the remote gateway // on the given port with the given path and expect a HTTP 200 response. ProbeSpec struct { - Path string - Port uint32 - Period time.Duration + FailureThreshold uint32 + Path string + Port uint32 + Period time.Duration + Timeout time.Duration } // Link is an internal representation of the link.multicluster.linkerd.io @@ -176,9 +181,11 @@ func (l Link) ToUnstructured() (unstructured.Unstructured, error) { "gatewayPort": fmt.Sprintf("%d", l.GatewayPort), "gatewayIdentity": l.GatewayIdentity, "probeSpec": map[string]interface{}{ - "path": l.ProbeSpec.Path, - "port": fmt.Sprintf("%d", l.ProbeSpec.Port), - "period": l.ProbeSpec.Period.String(), + "failureThreshold": fmt.Sprintf("%d", l.ProbeSpec.FailureThreshold), + "path": l.ProbeSpec.Path, + "port": fmt.Sprintf("%d", l.ProbeSpec.Port), + "period": l.ProbeSpec.Period.String(), + "timeout": l.ProbeSpec.Timeout.String(), }, } @@ -219,6 +226,17 @@ func (l Link) ToUnstructured() (unstructured.Unstructured, error) { // ExtractProbeSpec parses the ProbSpec from a gateway service's annotations. func ExtractProbeSpec(gateway *corev1.Service) (ProbeSpec, error) { + // older gateways might not have this field + failureThreshold := uint64(DefaultFailureThreshold) + failureThresholdStr := gateway.Annotations[k8s.GatewayProbeFailureThreshold] + if failureThresholdStr != "" { + var err error + failureThreshold, err = strconv.ParseUint(failureThresholdStr, 10, 32) + if err != nil { + return ProbeSpec{}, err + } + } + path := gateway.Annotations[k8s.GatewayProbePath] if path == "" { return ProbeSpec{}, errors.New("probe path is empty") @@ -234,10 +252,22 @@ func ExtractProbeSpec(gateway *corev1.Service) (ProbeSpec, error) { return ProbeSpec{}, err } + timeoutStr := gateway.Annotations[k8s.GatewayProbeTimeout] + if timeoutStr == "" { + timeoutStr = DefaultProbeTimeout + } + + timeout, err := time.ParseDuration(timeoutStr) + if err != nil { + return ProbeSpec{}, err + } + return ProbeSpec{ - Path: path, - Port: port, - Period: time.Duration(period) * time.Second, + FailureThreshold: uint32(failureThreshold), + Path: path, + Port: port, + Period: time.Duration(period) * time.Second, + Timeout: timeout, }, nil } @@ -294,6 +324,24 @@ func newProbeSpec(obj map[string]interface{}) (ProbeSpec, error) { return ProbeSpec{}, err } + failureThresholdStr, err := stringField(obj, "failureThreshold") + if err != nil { + return ProbeSpec{}, err + } + failureThreshold, err := strconv.ParseUint(failureThresholdStr, 10, 32) + if err != nil { + return ProbeSpec{}, err + } + + timeoutStr, err := stringField(obj, "timeout") + if err != nil { + return ProbeSpec{}, err + } + timeout, err := time.ParseDuration(timeoutStr) + if err != nil { + return ProbeSpec{}, err + } + path, err := stringField(obj, "path") if err != nil { return ProbeSpec{}, err @@ -309,9 +357,11 @@ func newProbeSpec(obj map[string]interface{}) (ProbeSpec, error) { } return ProbeSpec{ - Path: path, - Port: uint32(port), - Period: period, + FailureThreshold: uint32(failureThreshold), + Path: path, + Port: uint32(port), + Period: period, + Timeout: timeout, }, nil }