Add timeout and failureThreshold to multicluster probe (#13061)

* Add timeout and failureThreshold to multicluster probe

- This adds the `probeSpec.failureThreshold` and `probeSpec.timeout` fields to the Link CRD spec.
- Likewise, the `gateway.probe.failureThreshold` and `gateway.probe.timeout` fields are added to the linkerd-multicluster chart, that are used to populate the new `mirror.linkerd.io/probe-failure-threshold` and `mirror.linkerd.io/probe-timeout` annotations in the gateway service (consumed by `linkerd mc link` to populate probe spec).
- In the probe worker, we replace the hard-coded 50s timeout with the new timeout config (which now defaults to 30s). And the probe loop got refactored in order to not mark the gateway as unhealty until the consecutive failures threshold is reached.

* Make probeTicker.C synchronous
This commit is contained in:
Alejandro Pedraza 2024-09-25 08:06:31 -05:00 committed by GitHub
parent 995e51f865
commit 85222ddba3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 165 additions and 54 deletions

View File

@ -80,9 +80,11 @@ Kubernetes: `>=1.22.0-0`
| gateway.nodeSelector | object | `{}` | Node selectors for the gateway pod |
| gateway.pauseImage | string | `"gcr.io/google_containers/pause:3.2"` | The pause container to use |
| gateway.port | int | `4143` | The port on which all the gateway will accept incoming traffic |
| gateway.probe.failureThreshold | int | `3` | Minimum consecutive failures for the probe to be considered failed |
| gateway.probe.path | string | `"/ready"` | The path that will be used by remote clusters for determining whether the gateway is alive |
| gateway.probe.port | int | `4191` | The port used for liveliness probing |
| gateway.probe.seconds | int | `3` | The interval (in seconds) between liveness probes |
| gateway.probe.timeout | string | `"30s"` | Probe request timeout (in go's time.Duration format) |
| gateway.replicas | int | `1` | Number of replicas for the gateway pod |
| gateway.serviceAnnotations | object | `{}` | Annotations to add to the gateway service |
| gateway.serviceExternalTrafficPolicy | string | `""` | Set externalTrafficPolicy on gateway service |

View File

@ -102,8 +102,10 @@ metadata:
{{- with .Values.commonLabels }}{{ toYaml . | trim | nindent 4 }}{{- end }}
annotations:
mirror.linkerd.io/gateway-identity: {{.Values.gateway.name}}.{{.Release.Namespace}}.serviceaccount.identity.{{.Values.linkerdNamespace}}.{{.Values.identityTrustDomain}}
mirror.linkerd.io/probe-failure-threshold: "{{.Values.gateway.probe.failureThreshold}}"
mirror.linkerd.io/probe-period: "{{.Values.gateway.probe.seconds}}"
mirror.linkerd.io/probe-path: {{.Values.gateway.probe.path}}
mirror.linkerd.io/probe-timeout: "{{.Values.gateway.probe.timeout}}"
mirror.linkerd.io/multicluster-gateway: "true"
component: gateway
{{ include "partials.annotations.created-by" . }}

View File

@ -40,6 +40,10 @@ spec:
description: Spec for gateway health probe
type: object
properties:
failureThreshold:
default: "3"
description: Minimum consecutive failures for the probe to be considered failed
type: string
path:
description: Path of remote gateway health endpoint
type: string
@ -49,6 +53,11 @@ spec:
port:
description: Port of remote gateway health endpoint
type: string
timeout:
default: 30s
description: Probe request timeout
format: duration
type: string
selector:
description: Kubernetes Label Selector
type: object

View File

@ -12,6 +12,8 @@ gateway:
# nodePort -- Set the gateway nodePort (for LoadBalancer or NodePort) to a specific value
# nodePort:
probe:
# -- Minimum consecutive failures for the probe to be considered failed
failureThreshold: 3
# -- The path that will be used by remote clusters for determining whether the
# gateway is alive
path: /ready
@ -21,6 +23,8 @@ gateway:
# nodePort:
# -- The interval (in seconds) between liveness probes
seconds: 3
# -- Probe request timeout (in go's time.Duration format)
timeout: 30s
# -- Annotations to add to the gateway service
serviceAnnotations: {}
# -- Set externalTrafficPolicy on gateway service

View File

@ -68,8 +68,10 @@ metadata:
linkerd.io/extension: multicluster
annotations:
mirror.linkerd.io/gateway-identity: linkerd-gateway.linkerd-multicluster.serviceaccount.identity.linkerd.cluster.local
mirror.linkerd.io/probe-failure-threshold: "3"
mirror.linkerd.io/probe-period: "3"
mirror.linkerd.io/probe-path: /ready
mirror.linkerd.io/probe-timeout: "30s"
mirror.linkerd.io/multicluster-gateway: "true"
component: gateway
linkerd.io/created-by: linkerd/helm linkerdVersionValue
@ -277,6 +279,10 @@ spec:
description: Spec for gateway health probe
type: object
properties:
failureThreshold:
default: "3"
description: Minimum consecutive failures for the probe to be considered failed
type: string
path:
description: Path of remote gateway health endpoint
type: string
@ -286,6 +292,11 @@ spec:
port:
description: Port of remote gateway health endpoint
type: string
timeout:
default: 30s
description: Probe request timeout
format: duration
type: string
selector:
description: Kubernetes Label Selector
type: object

View File

@ -106,8 +106,10 @@ metadata:
linkerd.io/extension: multicluster
annotations:
mirror.linkerd.io/gateway-identity: linkerd-gateway.linkerd-multicluster.serviceaccount.identity.linkerd.cluster.local
mirror.linkerd.io/probe-failure-threshold: "3"
mirror.linkerd.io/probe-period: "3"
mirror.linkerd.io/probe-path: /ready
mirror.linkerd.io/probe-timeout: "30s"
mirror.linkerd.io/multicluster-gateway: "true"
component: gateway
linkerd.io/created-by: linkerd/helm linkerdVersionValue
@ -349,6 +351,10 @@ spec:
description: Spec for gateway health probe
type: object
properties:
failureThreshold:
default: "3"
description: Minimum consecutive failures for the probe to be considered failed
type: string
path:
description: Path of remote gateway health endpoint
type: string
@ -358,6 +364,11 @@ spec:
port:
description: Port of remote gateway health endpoint
type: string
timeout:
default: 30s
description: Probe request timeout
format: duration
type: string
selector:
description: Kubernetes Label Selector
type: object

View File

@ -68,8 +68,10 @@ metadata:
linkerd.io/extension: multicluster
annotations:
mirror.linkerd.io/gateway-identity: linkerd-gateway.linkerd-multicluster.serviceaccount.identity.linkerd.cluster.local
mirror.linkerd.io/probe-failure-threshold: "3"
mirror.linkerd.io/probe-period: "3"
mirror.linkerd.io/probe-path: /ready
mirror.linkerd.io/probe-timeout: "30s"
mirror.linkerd.io/multicluster-gateway: "true"
component: gateway
linkerd.io/created-by: linkerd/helm linkerdVersionValue
@ -311,6 +313,10 @@ spec:
description: Spec for gateway health probe
type: object
properties:
failureThreshold:
default: "3"
description: Minimum consecutive failures for the probe to be considered failed
type: string
path:
description: Path of remote gateway health endpoint
type: string
@ -320,6 +326,11 @@ spec:
port:
description: Port of remote gateway health endpoint
type: string
timeout:
default: 30s
description: Probe request timeout
format: duration
type: string
selector:
description: Kubernetes Label Selector
type: object

View File

@ -38,7 +38,7 @@ func NewTicker(minDuration time.Duration, maxJitter time.Duration) *Ticker {
if maxJitter < 0 {
log.WithField("jitter", minDuration).Panic("Negative jitter")
}
c := make(chan time.Time, 1)
c := make(chan time.Time)
ticker := &Ticker{
C: c,
stop: make(chan bool),

View File

@ -13,8 +13,6 @@ import (
logging "github.com/sirupsen/logrus"
)
const httpGatewayTimeoutMillis = 50000
// ProbeWorker is responsible for monitoring gateways using a probe specification
type ProbeWorker struct {
localGatewayName string
@ -65,76 +63,81 @@ func (pw *ProbeWorker) Start() {
}
func (pw *ProbeWorker) run() {
successLabel := prometheus.Labels{probeSuccessfulLabel: "true"}
notSuccessLabel := prometheus.Labels{probeSuccessfulLabel: "false"}
probeTickerPeriod := pw.probeSpec.Period
maxJitter := pw.probeSpec.Period / 10 // max jitter is 10% of period
probeTicker := NewTicker(probeTickerPeriod, maxJitter)
defer probeTicker.Stop()
var failures uint32 = 0
probeLoop:
for {
select {
case <-pw.stopCh:
break probeLoop
case <-probeTicker.C:
pw.doProbe()
start := time.Now()
if err := pw.doProbe(); err != nil {
pw.log.Warn(err)
failures++
if failures < pw.probeSpec.FailureThreshold {
continue probeLoop
}
pw.log.Warnf("Failure threshold (%d) reached - Marking as unhealthy", pw.probeSpec.FailureThreshold)
pw.metrics.alive.Set(0)
pw.metrics.probes.With(notSuccessLabel).Inc()
if pw.alive {
pw.alive = false
pw.Liveness <- false
}
} else {
end := time.Since(start)
failures = 0
pw.log.Debug("Gateway is healthy")
pw.metrics.alive.Set(1)
pw.metrics.latency.Set(float64(end.Milliseconds()))
pw.metrics.latencies.Observe(float64(end.Milliseconds()))
pw.metrics.probes.With(successLabel).Inc()
if !pw.alive {
pw.alive = true
pw.Liveness <- true
}
}
}
}
}
func (pw *ProbeWorker) doProbe() {
func (pw *ProbeWorker) doProbe() error {
pw.RLock()
defer pw.RUnlock()
successLabel := prometheus.Labels{probeSuccessfulLabel: "true"}
notSuccessLabel := prometheus.Labels{probeSuccessfulLabel: "false"}
client := http.Client{
Timeout: httpGatewayTimeoutMillis * time.Millisecond,
Timeout: pw.probeSpec.Timeout,
}
strPort := strconv.Itoa(int(pw.probeSpec.Port))
urlAddress := net.JoinHostPort(pw.localGatewayName, strPort)
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s%s", urlAddress, pw.probeSpec.Path), nil)
if err != nil {
pw.log.Errorf("Could not create a GET request to gateway: %s", err)
return
return fmt.Errorf("could not create a GET request to gateway: %w", err)
}
start := time.Now()
resp, err := client.Do(req)
end := time.Since(start)
if err != nil {
pw.log.Warnf("Problem connecting with gateway. Marking as unhealthy %s", err)
pw.metrics.alive.Set(0)
pw.metrics.probes.With(notSuccessLabel).Inc()
if pw.alive {
pw.alive = false
pw.Liveness <- false
}
return
return fmt.Errorf("problem connecting with gateway: %w", err)
}
if resp.StatusCode != 200 {
pw.log.Warnf("Gateway returned unexpected status %d. Marking as unhealthy", resp.StatusCode)
pw.metrics.alive.Set(0)
pw.metrics.probes.With(notSuccessLabel).Inc()
if pw.alive {
pw.alive = false
pw.Liveness <- false
}
} else {
pw.log.Debug("Gateway is healthy")
pw.metrics.alive.Set(1)
pw.metrics.latency.Set(float64(end.Milliseconds()))
pw.metrics.latencies.Observe(float64(end.Milliseconds()))
pw.metrics.probes.With(successLabel).Inc()
if !pw.alive {
pw.alive = true
pw.Liveness <- true
}
return fmt.Errorf("gateway returned unexpected status %d", resp.StatusCode)
}
if err := resp.Body.Close(); err != nil {
pw.log.Warnf("Failed to close response body %s", err)
}
return nil
}

View File

@ -62,10 +62,12 @@ type Gateway struct {
// Probe contains all options for the Probe Service
type Probe struct {
Path string `json:"path"`
Port uint32 `json:"port"`
NodePort uint32 `json:"nodePort"`
Seconds uint32 `json:"seconds"`
FailureThreshold uint32 `json:"failureThreshold"`
Path string `json:"path"`
Port uint32 `json:"port"`
NodePort uint32 `json:"nodePort"`
Seconds uint32 `json:"seconds"`
Timeout string `json:"timeout"`
}
// NewInstallValues returns a new instance of the Values type.

View File

@ -467,12 +467,18 @@ const (
// GatewayIdentity can be found on the remote gateway service
GatewayIdentity = SvcMirrorPrefix + "/gateway-identity"
// GatewayProbeFailureThreshold is the minimum consecutive failures for the probe to be considered failed
GatewayProbeFailureThreshold = SvcMirrorPrefix + "/probe-failure-threshold"
// GatewayProbePeriod the interval at which the health of the gateway should be probed
GatewayProbePeriod = SvcMirrorPrefix + "/probe-period"
// GatewayProbePath the path at which the health of the gateway should be probed
GatewayProbePath = SvcMirrorPrefix + "/probe-path"
// GatewayProbeTimeout is the probe request timeout
GatewayProbeTimeout = SvcMirrorPrefix + "/probe-timeout"
// ConfigKeyName is the key in the secret that stores the kubeconfig needed to connect
// to a remote cluster
ConfigKeyName = "kubeconfig"

View File

@ -17,14 +17,19 @@ import (
"k8s.io/client-go/dynamic"
)
const DefaultFailureThreshold = 3
const DefaultProbeTimeout = "30s"
type (
// ProbeSpec defines how a gateway should be queried for health. Once per
// period, the probe workers will send an HTTP request to the remote gateway
// on the given port with the given path and expect a HTTP 200 response.
ProbeSpec struct {
Path string
Port uint32
Period time.Duration
FailureThreshold uint32
Path string
Port uint32
Period time.Duration
Timeout time.Duration
}
// Link is an internal representation of the link.multicluster.linkerd.io
@ -176,9 +181,11 @@ func (l Link) ToUnstructured() (unstructured.Unstructured, error) {
"gatewayPort": fmt.Sprintf("%d", l.GatewayPort),
"gatewayIdentity": l.GatewayIdentity,
"probeSpec": map[string]interface{}{
"path": l.ProbeSpec.Path,
"port": fmt.Sprintf("%d", l.ProbeSpec.Port),
"period": l.ProbeSpec.Period.String(),
"failureThreshold": fmt.Sprintf("%d", l.ProbeSpec.FailureThreshold),
"path": l.ProbeSpec.Path,
"port": fmt.Sprintf("%d", l.ProbeSpec.Port),
"period": l.ProbeSpec.Period.String(),
"timeout": l.ProbeSpec.Timeout.String(),
},
}
@ -219,6 +226,17 @@ func (l Link) ToUnstructured() (unstructured.Unstructured, error) {
// ExtractProbeSpec parses the ProbSpec from a gateway service's annotations.
func ExtractProbeSpec(gateway *corev1.Service) (ProbeSpec, error) {
// older gateways might not have this field
failureThreshold := uint64(DefaultFailureThreshold)
failureThresholdStr := gateway.Annotations[k8s.GatewayProbeFailureThreshold]
if failureThresholdStr != "" {
var err error
failureThreshold, err = strconv.ParseUint(failureThresholdStr, 10, 32)
if err != nil {
return ProbeSpec{}, err
}
}
path := gateway.Annotations[k8s.GatewayProbePath]
if path == "" {
return ProbeSpec{}, errors.New("probe path is empty")
@ -234,10 +252,22 @@ func ExtractProbeSpec(gateway *corev1.Service) (ProbeSpec, error) {
return ProbeSpec{}, err
}
timeoutStr := gateway.Annotations[k8s.GatewayProbeTimeout]
if timeoutStr == "" {
timeoutStr = DefaultProbeTimeout
}
timeout, err := time.ParseDuration(timeoutStr)
if err != nil {
return ProbeSpec{}, err
}
return ProbeSpec{
Path: path,
Port: port,
Period: time.Duration(period) * time.Second,
FailureThreshold: uint32(failureThreshold),
Path: path,
Port: port,
Period: time.Duration(period) * time.Second,
Timeout: timeout,
}, nil
}
@ -294,6 +324,24 @@ func newProbeSpec(obj map[string]interface{}) (ProbeSpec, error) {
return ProbeSpec{}, err
}
failureThresholdStr, err := stringField(obj, "failureThreshold")
if err != nil {
return ProbeSpec{}, err
}
failureThreshold, err := strconv.ParseUint(failureThresholdStr, 10, 32)
if err != nil {
return ProbeSpec{}, err
}
timeoutStr, err := stringField(obj, "timeout")
if err != nil {
return ProbeSpec{}, err
}
timeout, err := time.ParseDuration(timeoutStr)
if err != nil {
return ProbeSpec{}, err
}
path, err := stringField(obj, "path")
if err != nil {
return ProbeSpec{}, err
@ -309,9 +357,11 @@ func newProbeSpec(obj map[string]interface{}) (ProbeSpec, error) {
}
return ProbeSpec{
Path: path,
Port: uint32(port),
Period: period,
FailureThreshold: uint32(failureThreshold),
Path: path,
Port: uint32(port),
Period: period,
Timeout: timeout,
}, nil
}