901 lines
26 KiB
Go
901 lines
26 KiB
Go
package controller
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
apiv1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
|
|
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
|
|
"github.com/argoproj/argo-workflows/v3/util/logging"
|
|
)
|
|
|
|
var basicMetric = `
|
|
apiVersion: argoproj.io/v1alpha1
|
|
kind: Workflow
|
|
metadata:
|
|
generateName: hello-world-
|
|
spec:
|
|
entrypoint: random-int
|
|
templates:
|
|
- name: random-int
|
|
metrics:
|
|
prometheus:
|
|
- name: duration_gauge
|
|
labels:
|
|
- key: name
|
|
value: random-int
|
|
help: "Duration gauge by name"
|
|
gauge:
|
|
value: "{{duration}}"
|
|
outputs:
|
|
parameters:
|
|
- name: rand-int-value
|
|
globalName: rand-int-value
|
|
valueFrom:
|
|
path: /tmp/rand_int.txt
|
|
container:
|
|
image: alpine:latest
|
|
command: [sh, -c]
|
|
args: ["RAND_INT=$((1 + RANDOM % 10)); echo $RAND_INT; echo $RAND_INT > /tmp/rand_int.txt"]
|
|
`
|
|
|
|
func TestBasicMetric(t *testing.T) {
|
|
cancel, controller := newController()
|
|
defer cancel()
|
|
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")
|
|
wf := v1alpha1.MustUnmarshalWorkflow(basicMetric)
|
|
ctx := context.Background()
|
|
ctx = logging.WithLogger(ctx, logging.NewSlogLogger(logging.GetGlobalLevel(), logging.GetGlobalFormat()))
|
|
ctx = logging.WithLogger(ctx, logging.NewSlogLogger(logging.GetGlobalLevel(), logging.GetGlobalFormat()))
|
|
_, err := wfcset.Create(ctx, wf, metav1.CreateOptions{})
|
|
require.NoError(t, err)
|
|
woc := newWorkflowOperationCtx(ctx, wf, controller)
|
|
woc.operate(ctx)
|
|
|
|
// Schedule first pod and mark completed
|
|
woc = newWorkflowOperationCtx(ctx, woc.wf, controller)
|
|
woc.operate(ctx)
|
|
makePodsPhase(ctx, woc, apiv1.PodSucceeded)
|
|
|
|
// Process first metrics
|
|
woc = newWorkflowOperationCtx(ctx, woc.wf, controller)
|
|
woc.operate(ctx)
|
|
|
|
metricName := wf.Spec.Templates[0].Metrics.Prometheus[0].Name
|
|
assert.True(t, controller.metrics.CustomMetricExists(metricName))
|
|
attribs := attribute.NewSet(attribute.String("name", "random-int"))
|
|
_, err = testExporter.GetFloat64GaugeValue(metricName, &attribs)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
var gaugeMetric = `
|
|
apiVersion: argoproj.io/v1alpha1
|
|
kind: Workflow
|
|
metadata:
|
|
name: gauge-metric
|
|
spec:
|
|
entrypoint: whalesay
|
|
templates:
|
|
- name: whalesay
|
|
metrics:
|
|
prometheus:
|
|
- name: custom_gauge_add
|
|
labels:
|
|
- key: name
|
|
value: random-int
|
|
help: "A custom gauge"
|
|
gauge:
|
|
operation: Add
|
|
value: "10"
|
|
- name: custom_gauge_sub
|
|
labels:
|
|
- key: name
|
|
value: random-int
|
|
help: "A custom gauge"
|
|
gauge:
|
|
operation: Sub
|
|
value: "5"
|
|
- name: custom_gauge_set
|
|
labels:
|
|
- key: name
|
|
value: random-int
|
|
help: "A custom gauge"
|
|
gauge:
|
|
operation: Set
|
|
value: "50"
|
|
- name: custom_gauge_default
|
|
labels:
|
|
- key: name
|
|
value: random-int
|
|
help: "A custom gauge"
|
|
gauge:
|
|
value: "15"
|
|
container:
|
|
image: docker/whalesay:latest
|
|
command: [cowsay]
|
|
|
|
`
|
|
|
|
func TestGaugeMetric(t *testing.T) {
|
|
wf := v1alpha1.MustUnmarshalWorkflow(gaugeMetric)
|
|
cancel, controller := newController(wf)
|
|
defer cancel()
|
|
|
|
// Schedule first pod and mark completed
|
|
ctx := context.Background()
|
|
ctx = logging.WithLogger(ctx, logging.NewSlogLogger(logging.GetGlobalLevel(), logging.GetGlobalFormat()))
|
|
ctx = logging.WithLogger(ctx, logging.NewSlogLogger(logging.GetGlobalLevel(), logging.GetGlobalFormat()))
|
|
woc := newWorkflowOperationCtx(ctx, wf, controller)
|
|
woc.operate(ctx)
|
|
makePodsPhase(ctx, woc, apiv1.PodFailed)
|
|
|
|
// Process first metrics
|
|
woc = newWorkflowOperationCtx(ctx, woc.wf, controller)
|
|
woc.operate(ctx)
|
|
|
|
attribs := attribute.NewSet(attribute.String("name", "random-int"))
|
|
|
|
valAdd, err := testExporter.GetFloat64GaugeValue(woc.wf.Spec.Templates[0].Metrics.Prometheus[0].Name, &attribs)
|
|
require.NoError(t, err)
|
|
assert.InEpsilon(t, float64(10.0), valAdd, 0.001)
|
|
|
|
valSub, err := testExporter.GetFloat64GaugeValue(woc.wf.Spec.Templates[0].Metrics.Prometheus[1].Name, &attribs)
|
|
require.NoError(t, err)
|
|
assert.InEpsilon(t, float64(-5.0), valSub, 0.001)
|
|
|
|
valSet, err := testExporter.GetFloat64GaugeValue(woc.wf.Spec.Templates[0].Metrics.Prometheus[2].Name, &attribs)
|
|
require.NoError(t, err)
|
|
assert.InEpsilon(t, float64(50.0), valSet, 0.001)
|
|
|
|
valDefault, err := testExporter.GetFloat64GaugeValue(woc.wf.Spec.Templates[0].Metrics.Prometheus[3].Name, &attribs)
|
|
require.NoError(t, err)
|
|
assert.InEpsilon(t, float64(15.0), valDefault, 0.001)
|
|
}
|
|
|
|
var counterMetric = `
|
|
apiVersion: argoproj.io/v1alpha1
|
|
kind: Workflow
|
|
metadata:
|
|
name: counter-metric
|
|
spec:
|
|
entrypoint: whalesay
|
|
templates:
|
|
- name: whalesay
|
|
metrics:
|
|
prometheus:
|
|
- name: execution_counter
|
|
help: "How many times a step has executed"
|
|
labels:
|
|
- key: name
|
|
value: flakey
|
|
counter:
|
|
value: "1"
|
|
- name: failure_counter
|
|
help: "How many times a step has failed"
|
|
labels:
|
|
- key: name
|
|
value: flakey
|
|
when: "{{status}} == Failed"
|
|
counter:
|
|
value: "1"
|
|
container:
|
|
image: docker/whalesay:latest
|
|
command: [cowsay]
|
|
|
|
`
|
|
|
|
func TestCounterMetric(t *testing.T) {
|
|
wf := v1alpha1.MustUnmarshalWorkflow(counterMetric)
|
|
cancel, controller := newController(wf)
|
|
defer cancel()
|
|
|
|
// Schedule first pod and mark completed
|
|
ctx := context.Background()
|
|
ctx = logging.WithLogger(ctx, logging.NewSlogLogger(logging.GetGlobalLevel(), logging.GetGlobalFormat()))
|
|
ctx = logging.WithLogger(ctx, logging.NewSlogLogger(logging.GetGlobalLevel(), logging.GetGlobalFormat()))
|
|
woc := newWorkflowOperationCtx(ctx, wf, controller)
|
|
woc.operate(ctx)
|
|
makePodsPhase(ctx, woc, apiv1.PodFailed)
|
|
|
|
// Process first metrics
|
|
woc = newWorkflowOperationCtx(ctx, woc.wf, controller)
|
|
woc.operate(ctx)
|
|
|
|
attribs := attribute.NewSet(attribute.String("name", "flakey"))
|
|
|
|
valTotal, err := testExporter.GetFloat64CounterValue(woc.wf.Spec.Templates[0].Metrics.Prometheus[0].Name, &attribs)
|
|
require.NoError(t, err)
|
|
assert.InDelta(t, float64(1), valTotal, 0.001)
|
|
|
|
valError, err := testExporter.GetFloat64CounterValue(woc.wf.Spec.Templates[0].Metrics.Prometheus[1].Name, &attribs)
|
|
require.NoError(t, err)
|
|
assert.InDelta(t, float64(1), valError, 0.001)
|
|
}
|
|
|
|
var testMetricEmissionSameOperationCreationAndFailure = `
|
|
apiVersion: argoproj.io/v1alpha1
|
|
kind: Workflow
|
|
metadata:
|
|
creationTimestamp: "2020-05-14T14:30:31Z"
|
|
name: steps-s5rz4
|
|
spec:
|
|
entrypoint: steps-1
|
|
onExit: whalesay
|
|
templates:
|
|
- inputs: {}
|
|
metadata: {}
|
|
name: steps-1
|
|
outputs: {}
|
|
steps:
|
|
- -
|
|
name: hello2a
|
|
template: steps-2
|
|
- inputs: {}
|
|
metadata: {}
|
|
metrics:
|
|
prometheus:
|
|
- counter:
|
|
value: "1"
|
|
gauge: null
|
|
help: Failure
|
|
histogram: null
|
|
labels: null
|
|
name: failure
|
|
when: '{{status}} == Failed'
|
|
name: steps-2
|
|
outputs: {}
|
|
steps:
|
|
- - name: hello1
|
|
template: whalesay
|
|
withParam: mary had a little lamb
|
|
- container:
|
|
args:
|
|
- hello
|
|
command:
|
|
- cowsay
|
|
image: docker/whalesay
|
|
name: ""
|
|
resources: {}
|
|
inputs: {}
|
|
metadata: {}
|
|
name: whalesay
|
|
outputs: {}
|
|
status:
|
|
phase: Running
|
|
startedAt: "2020-05-14T14:30:31Z"
|
|
`
|
|
|
|
func TestMetricEmissionSameOperationCreationAndFailure(t *testing.T) {
|
|
cancel, controller := newController()
|
|
defer cancel()
|
|
ctx := context.Background()
|
|
ctx = logging.WithLogger(ctx, logging.NewSlogLogger(logging.GetGlobalLevel(), logging.GetGlobalFormat()))
|
|
ctx = logging.WithLogger(ctx, logging.NewSlogLogger(logging.GetGlobalLevel(), logging.GetGlobalFormat()))
|
|
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")
|
|
wf := v1alpha1.MustUnmarshalWorkflow(testMetricEmissionSameOperationCreationAndFailure)
|
|
_, err := wfcset.Create(ctx, wf, metav1.CreateOptions{})
|
|
require.NoError(t, err)
|
|
woc := newWorkflowOperationCtx(ctx, wf, controller)
|
|
|
|
woc.operate(ctx)
|
|
|
|
attribs := attribute.NewSet()
|
|
|
|
valError, err := testExporter.GetFloat64CounterValue(woc.wf.Spec.Templates[1].Metrics.Prometheus[0].Name, &attribs)
|
|
require.NoError(t, err)
|
|
assert.InDelta(t, float64(1), valError, 0.001)
|
|
}
|
|
|
|
var testRetryStrategyMetric = `
|
|
apiVersion: argoproj.io/v1alpha1
|
|
kind: Workflow
|
|
metadata:
|
|
name: workflow-template-whalesay-9pk8f
|
|
spec:
|
|
entrypoint: whalesay
|
|
templates:
|
|
- inputs: {}
|
|
metadata: {}
|
|
metrics:
|
|
prometheus:
|
|
- counter:
|
|
value: "1"
|
|
help: number of times the outer workflow was invoked
|
|
name: workflow_counter
|
|
name: whalesay
|
|
outputs: {}
|
|
steps:
|
|
- - arguments:
|
|
parameters:
|
|
- name: message
|
|
value: hello world
|
|
name: call-whalesay-template
|
|
template: whalesay-template
|
|
- container:
|
|
args:
|
|
- '{{inputs.parameters.message}}'
|
|
command:
|
|
- cowsay
|
|
image: docker/whalesay
|
|
name: ""
|
|
resources: {}
|
|
inputs:
|
|
parameters:
|
|
- name: message
|
|
metadata: {}
|
|
metrics:
|
|
prometheus:
|
|
- counter:
|
|
value: "1"
|
|
help: number of times the template was executed
|
|
name: template_counter
|
|
name: whalesay-template
|
|
outputs: {}
|
|
retryStrategy:
|
|
limit: "2"
|
|
`
|
|
|
|
func TestRetryStrategyMetric(t *testing.T) {
|
|
wf := v1alpha1.MustUnmarshalWorkflow(testRetryStrategyMetric)
|
|
cancel, controller := newController(wf)
|
|
defer cancel()
|
|
ctx := context.Background()
|
|
ctx = logging.WithLogger(ctx, logging.NewSlogLogger(logging.GetGlobalLevel(), logging.GetGlobalFormat()))
|
|
ctx = logging.WithLogger(ctx, logging.NewSlogLogger(logging.GetGlobalLevel(), logging.GetGlobalFormat()))
|
|
woc := newWorkflowOperationCtx(ctx, wf, controller)
|
|
woc.operate(ctx)
|
|
|
|
// Ensure no metrics have been emitted yet
|
|
metricErrorDesc := wf.Spec.Templates[0].Metrics.Prometheus[0].GetKey()
|
|
assert.Nil(t, controller.metrics.GetCustomMetric(metricErrorDesc))
|
|
metricErrorDesc = wf.Spec.Templates[1].Metrics.Prometheus[0].GetKey()
|
|
assert.Nil(t, controller.metrics.GetCustomMetric(metricErrorDesc))
|
|
|
|
// Simulate pod succeeded
|
|
podNode := woc.wf.Status.Nodes["workflow-template-whalesay-9pk8f-1966833540"]
|
|
podNode.Phase = v1alpha1.NodeSucceeded
|
|
woc.wf.Status.Nodes["workflow-template-whalesay-9pk8f-1966833540"] = podNode
|
|
woc = newWorkflowOperationCtx(ctx, woc.wf, controller)
|
|
woc.operate(ctx)
|
|
|
|
attribs := attribute.NewSet()
|
|
|
|
valWfError, err := testExporter.GetFloat64CounterValue(woc.wf.Spec.Templates[0].Metrics.Prometheus[0].Name, &attribs)
|
|
require.NoError(t, err)
|
|
assert.InDelta(t, float64(1.0), valWfError, 0.001)
|
|
|
|
valTplError, err := testExporter.GetFloat64CounterValue(woc.wf.Spec.Templates[1].Metrics.Prometheus[0].Name, &attribs)
|
|
require.NoError(t, err)
|
|
assert.InDelta(t, float64(1.0), valTplError, 0.001)
|
|
}
|
|
|
|
var dagTmplMetrics = `
|
|
apiVersion: argoproj.io/v1alpha1
|
|
kind: Workflow
|
|
metadata:
|
|
name: hello-world-nl9bj
|
|
spec:
|
|
entrypoint: steps
|
|
templates:
|
|
- dag:
|
|
tasks:
|
|
- name: random-int-dag
|
|
template: random-int
|
|
- name: flakey-dag
|
|
template: flakey
|
|
name: steps
|
|
outputs: {}
|
|
- container:
|
|
args:
|
|
- RAND_INT=$((1 + RANDOM % 10)); echo $RAND_INT; echo $RAND_INT > /tmp/rand_int.txt
|
|
command:
|
|
- sh
|
|
- -c
|
|
image: alpine:latest
|
|
name: ""
|
|
resources: {}
|
|
inputs: {}
|
|
metadata: {}
|
|
metrics:
|
|
prometheus:
|
|
- help: Value of the int emitted by random-int at step level
|
|
histogram:
|
|
buckets:
|
|
- 2.01
|
|
- 4.01
|
|
- 6.01
|
|
- 8.01
|
|
- 10.01
|
|
value: 5
|
|
name: random_int_step_histogram_dag
|
|
- gauge:
|
|
realtime: true
|
|
value: '{{duration}}'
|
|
help: Duration gauge by name
|
|
labels:
|
|
- key: name
|
|
value: random-int
|
|
name: duration_gauge_dag
|
|
name: random-int
|
|
outputs:
|
|
parameters:
|
|
- globalName: rand-int-value
|
|
name: rand-int-value
|
|
valueFrom:
|
|
path: /tmp/rand_int.txt
|
|
- container:
|
|
args:
|
|
- import random; import sys; exit_code = random.choice([0, 1, 1]); sys.exit(exit_code)
|
|
command:
|
|
- python
|
|
- -c
|
|
image: python:alpine3.6
|
|
name: ""
|
|
resources: {}
|
|
inputs: {}
|
|
metadata: {}
|
|
metrics:
|
|
prometheus:
|
|
- counter:
|
|
value: "1"
|
|
help: Count of step execution by result status
|
|
labels:
|
|
- key: name
|
|
value: flakey
|
|
- key: status
|
|
value: Failed
|
|
name: result_counter_dag
|
|
name: flakey
|
|
outputs: {}
|
|
`
|
|
|
|
func TestDAGTmplMetrics(t *testing.T) {
|
|
cancel, controller := newController()
|
|
defer cancel()
|
|
ctx := context.Background()
|
|
ctx = logging.WithLogger(ctx, logging.NewSlogLogger(logging.GetGlobalLevel(), logging.GetGlobalFormat()))
|
|
ctx = logging.WithLogger(ctx, logging.NewSlogLogger(logging.GetGlobalLevel(), logging.GetGlobalFormat()))
|
|
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")
|
|
wf := v1alpha1.MustUnmarshalWorkflow(dagTmplMetrics)
|
|
_, err := wfcset.Create(ctx, wf, metav1.CreateOptions{})
|
|
require.NoError(t, err)
|
|
woc := newWorkflowOperationCtx(ctx, wf, controller)
|
|
|
|
woc.operate(ctx)
|
|
makePodsPhase(ctx, woc, apiv1.PodSucceeded)
|
|
woc.operate(ctx)
|
|
|
|
attribs := attribute.NewSet()
|
|
tmpl := woc.wf.GetTemplateByName("random-int")
|
|
assert.NotNil(t, tmpl)
|
|
|
|
val, err := testExporter.GetFloat64HistogramData(tmpl.Metrics.Prometheus[0].Name, &attribs)
|
|
require.NoError(t, err)
|
|
assert.InEpsilon(t, float64(5.0), val.Sum, 0.001)
|
|
assert.Equal(t, uint64(1), val.Count)
|
|
|
|
attribs = attribute.NewSet(attribute.String("name", "flakey"), attribute.String("status", "Failed"))
|
|
tmpl = woc.wf.GetTemplateByName("flakey")
|
|
assert.NotNil(t, tmpl)
|
|
valErrCount, err := testExporter.GetFloat64CounterValue(tmpl.Metrics.Prometheus[0].Name, &attribs)
|
|
require.NoError(t, err)
|
|
assert.InDelta(t, float64(1), valErrCount, 0.001)
|
|
}
|
|
|
|
var testRealtimeWorkflowMetric = `
|
|
apiVersion: argoproj.io/v1alpha1
|
|
kind: Workflow
|
|
metadata:
|
|
name: test-foobar
|
|
labels:
|
|
testLabel: foobar
|
|
spec:
|
|
entrypoint: whalesay
|
|
metrics:
|
|
prometheus:
|
|
- name: intuit_data_persistplat_dppselfservice_workflow_test_duration
|
|
help: Duration of workflow
|
|
labels:
|
|
- key: workflowName
|
|
value: "{{workflow.name}}"
|
|
- key: label
|
|
value: "{{workflow.labels.testLabel}}"
|
|
gauge:
|
|
realtime: true
|
|
value: "{{ workflow.duration }}"
|
|
templates:
|
|
- name: whalesay
|
|
container:
|
|
image: docker/whalesay
|
|
command: [ cowsay ]
|
|
args: [ "hello world" ]
|
|
`
|
|
|
|
func TestRealtimeWorkflowMetric(t *testing.T) {
|
|
cancel, controller := newController()
|
|
defer cancel()
|
|
ctx := context.Background()
|
|
ctx = logging.WithLogger(ctx, logging.NewSlogLogger(logging.GetGlobalLevel(), logging.GetGlobalFormat()))
|
|
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")
|
|
wf := v1alpha1.MustUnmarshalWorkflow(testRealtimeWorkflowMetric)
|
|
_, err := wfcset.Create(ctx, wf, metav1.CreateOptions{})
|
|
require.NoError(t, err)
|
|
woc := newWorkflowOperationCtx(ctx, wf, controller)
|
|
|
|
woc.operate(ctx)
|
|
|
|
attribs := attribute.NewSet(attribute.String("label", "foobar"), attribute.String("workflowName", "test-foobar"))
|
|
value, err := testExporter.GetFloat64GaugeValue(woc.wf.Spec.Metrics.Prometheus[0].Name, &attribs)
|
|
require.NoError(t, err)
|
|
value1, err := testExporter.GetFloat64GaugeValue(woc.wf.Spec.Metrics.Prometheus[0].Name, &attribs)
|
|
require.NoError(t, err)
|
|
t.Logf("%v new %v old", value1, value)
|
|
assert.Greater(t, value1, value)
|
|
|
|
woc.markWorkflowSuccess(ctx)
|
|
value2, err := testExporter.GetFloat64GaugeValue(woc.wf.Spec.Metrics.Prometheus[0].Name, &attribs)
|
|
require.NoError(t, err)
|
|
time.Sleep(10 * time.Millisecond)
|
|
value3, err := testExporter.GetFloat64GaugeValue(woc.wf.Spec.Metrics.Prometheus[0].Name, &attribs)
|
|
require.NoError(t, err)
|
|
// Duration should be same after workflow complete
|
|
assert.InEpsilon(t, value2, value3, 0.001)
|
|
}
|
|
|
|
var testRealtimeWorkflowMetricWithGlobalParameters = `
|
|
apiVersion: argoproj.io/v1alpha1
|
|
kind: Workflow
|
|
metadata:
|
|
name: test-foobar
|
|
labels:
|
|
testLabel: foobar
|
|
spec:
|
|
arguments:
|
|
parameters:
|
|
- name: testParam
|
|
value: foo
|
|
entrypoint: whalesay
|
|
metrics:
|
|
prometheus:
|
|
- name: intuit_data_persistplat_dppselfservice_workflow_test_duration
|
|
help: Duration of workflow
|
|
labels:
|
|
- key: workflowName
|
|
value: "{{workflow.name}}"
|
|
- key: label
|
|
value: "{{workflow.labels.testLabel}}"
|
|
gauge:
|
|
realtime: true
|
|
value: "{{workflow.duration}}"
|
|
templates:
|
|
- name: whalesay
|
|
container:
|
|
image: docker/whalesay
|
|
command: [ cowsay ]
|
|
args: [ "hello world" ]
|
|
`
|
|
|
|
func TestRealtimeWorkflowMetricWithGlobalParameters(t *testing.T) {
|
|
cancel, controller := newController()
|
|
defer cancel()
|
|
ctx := context.Background()
|
|
ctx = logging.WithLogger(ctx, logging.NewSlogLogger(logging.GetGlobalLevel(), logging.GetGlobalFormat()))
|
|
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")
|
|
wf := v1alpha1.MustUnmarshalWorkflow(testRealtimeWorkflowMetricWithGlobalParameters)
|
|
_, err := wfcset.Create(ctx, wf, metav1.CreateOptions{})
|
|
require.NoError(t, err)
|
|
woc := newWorkflowOperationCtx(ctx, wf, controller)
|
|
|
|
woc.operate(ctx)
|
|
|
|
attribs := attribute.NewSet(attribute.String("label", "foobar"), attribute.String("workflowName", "test-foobar"))
|
|
_, err = testExporter.GetFloat64GaugeValue(woc.wf.Spec.Metrics.Prometheus[0].Name, &attribs)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
var testProcessedRetryNode = `
|
|
apiVersion: argoproj.io/v1alpha1
|
|
kind: Workflow
|
|
metadata:
|
|
name: metrics-eg-lq4nj
|
|
spec:
|
|
entrypoint: my-dag
|
|
templates:
|
|
- dag:
|
|
tasks:
|
|
- name: A
|
|
template: A
|
|
name: my-dag
|
|
- container:
|
|
args:
|
|
- hello from A
|
|
command:
|
|
- cowsay
|
|
image: docker/whalesay
|
|
metrics:
|
|
prometheus:
|
|
- counter:
|
|
value: "1"
|
|
help: Number of argo workflows
|
|
labels:
|
|
- key: work_unit
|
|
value: metrics-eg::A
|
|
- key: workflow_result
|
|
value: '{{status}}'
|
|
name: result_counter
|
|
name: A
|
|
retryStrategy:
|
|
backoff:
|
|
duration: 2s
|
|
factor: 1
|
|
maxDuration: 6m
|
|
limit: 2
|
|
retryPolicy: Always
|
|
status:
|
|
nodes:
|
|
metrics-eg-lq4nj:
|
|
children:
|
|
- metrics-eg-lq4nj-4266717436
|
|
displayName: metrics-eg-lq4nj
|
|
finishedAt: "2021-01-13T16:14:03Z"
|
|
id: metrics-eg-lq4nj
|
|
name: metrics-eg-lq4nj
|
|
outboundNodes:
|
|
- metrics-eg-lq4nj-2568729143
|
|
phase: Running
|
|
startedAt: "2021-01-13T16:13:53Z"
|
|
templateName: my-dag
|
|
templateScope: local/metrics-eg-lq4nj
|
|
type: DAG
|
|
metrics-eg-lq4nj-2568729143:
|
|
boundaryID: metrics-eg-lq4nj
|
|
displayName: A(0)
|
|
finishedAt: "2021-01-13T16:13:57Z"
|
|
id: metrics-eg-lq4nj-2568729143
|
|
name: metrics-eg-lq4nj.A(0)
|
|
phase: Succeeded
|
|
startedAt: "2021-01-13T16:13:53Z"
|
|
templateName: A
|
|
templateScope: local/metrics-eg-lq4nj
|
|
type: Pod
|
|
nodeFlag:
|
|
retried: true
|
|
metrics-eg-lq4nj-4266717436:
|
|
boundaryID: metrics-eg-lq4nj
|
|
children:
|
|
- metrics-eg-lq4nj-2568729143
|
|
displayName: A
|
|
finishedAt: "2021-01-13T16:14:03Z"
|
|
id: metrics-eg-lq4nj-4266717436
|
|
name: metrics-eg-lq4nj.A
|
|
phase: Running
|
|
startedAt: "2021-01-13T16:13:53Z"
|
|
templateName: A
|
|
templateScope: local/metrics-eg-lq4nj
|
|
type: Retry
|
|
phase: Running
|
|
startedAt: "2021-01-13T16:13:53Z"
|
|
`
|
|
|
|
func TestProcessedRetryNode(t *testing.T) {
|
|
cancel, controller := newController()
|
|
defer cancel()
|
|
ctx := context.Background()
|
|
ctx = logging.WithLogger(ctx, logging.NewSlogLogger(logging.GetGlobalLevel(), logging.GetGlobalFormat()))
|
|
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")
|
|
wf := v1alpha1.MustUnmarshalWorkflow(testProcessedRetryNode)
|
|
_, err := wfcset.Create(ctx, wf, metav1.CreateOptions{})
|
|
require.NoError(t, err)
|
|
woc := newWorkflowOperationCtx(ctx, wf, controller)
|
|
|
|
woc.operate(ctx)
|
|
|
|
attribs := attribute.NewSet(attribute.String("work_unit", "metrics-eg::A"), attribute.String("workflow_result", "Succeeded"))
|
|
value, err := testExporter.GetFloat64CounterValue(woc.wf.Spec.Templates[1].Metrics.Prometheus[0].Name, &attribs)
|
|
require.NoError(t, err)
|
|
assert.InDelta(t, float64(1), value, 0.001)
|
|
}
|
|
|
|
var suspendWfWithMetrics = `apiVersion: argoproj.io/v1alpha1
|
|
kind: Workflow
|
|
metadata:
|
|
name: suspend-template-qndm5
|
|
spec:
|
|
entrypoint: suspend
|
|
metrics:
|
|
prometheus:
|
|
- gauge:
|
|
realtime: true
|
|
value: '{{workflow.duration}}'
|
|
help: Duration gauge by name
|
|
labels:
|
|
- key: name
|
|
value: model_a
|
|
name: exec_duration_gauge
|
|
templates:
|
|
- name: suspend
|
|
steps:
|
|
- - name: build
|
|
template: whalesay
|
|
- - name: approve
|
|
template: approve
|
|
- - name: delay
|
|
template: delay
|
|
- - name: release
|
|
template: whalesay
|
|
- name: approve
|
|
suspend: {}
|
|
- name: delay
|
|
suspend:
|
|
duration: "20"
|
|
- container:
|
|
args:
|
|
- hello world
|
|
command:
|
|
- cowsay
|
|
image: docker/whalesay
|
|
name: ""
|
|
name: whalesay
|
|
ttlStrategy:
|
|
secondsAfterCompletion: 600
|
|
status:
|
|
conditions:
|
|
- status: "False"
|
|
type: PodRunning
|
|
finishedAt: null
|
|
nodes:
|
|
suspend-template-qndm5:
|
|
children:
|
|
- suspend-template-qndm5-343839516
|
|
displayName: suspend-template-qndm5
|
|
finishedAt: null
|
|
id: suspend-template-qndm5
|
|
name: suspend-template-qndm5
|
|
phase: Running
|
|
progress: 1/1
|
|
startedAt: "2021-09-28T12:23:10Z"
|
|
templateName: suspend
|
|
templateScope: local/suspend-template-qndm5
|
|
type: Steps
|
|
suspend-template-qndm5-343839516:
|
|
boundaryID: suspend-template-qndm5
|
|
children:
|
|
- suspend-template-qndm5-2823755246
|
|
displayName: '[0]'
|
|
finishedAt: "2021-09-28T12:23:20Z"
|
|
id: suspend-template-qndm5-343839516
|
|
name: suspend-template-qndm5[0]
|
|
phase: Succeeded
|
|
progress: 1/1
|
|
resourcesDuration:
|
|
cpu: 6
|
|
memory: 3
|
|
startedAt: "2021-09-28T12:23:10Z"
|
|
templateScope: local/suspend-template-qndm5
|
|
type: StepGroup
|
|
suspend-template-qndm5-2823755246:
|
|
boundaryID: suspend-template-qndm5
|
|
children:
|
|
- suspend-template-qndm5-3632002577
|
|
displayName: build
|
|
finishedAt: "2021-09-28T12:23:16Z"
|
|
hostNodeName: kind-control-plane
|
|
id: suspend-template-qndm5-2823755246
|
|
name: suspend-template-qndm5[0].build
|
|
outputs:
|
|
exitCode: "0"
|
|
phase: Succeeded
|
|
progress: 1/1
|
|
resourcesDuration:
|
|
cpu: 6
|
|
memory: 3
|
|
startedAt: "2021-09-28T12:23:10Z"
|
|
templateName: whalesay
|
|
templateScope: local/suspend-template-qndm5
|
|
type: Pod
|
|
suspend-template-qndm5-3456849218:
|
|
boundaryID: suspend-template-qndm5
|
|
displayName: approve
|
|
finishedAt: null
|
|
id: suspend-template-qndm5-3456849218
|
|
name: suspend-template-qndm5[1].approve
|
|
phase: Running
|
|
startedAt: "2021-09-28T12:23:20Z"
|
|
templateName: approve
|
|
templateScope: local/suspend-template-qndm5
|
|
type: Suspend
|
|
suspend-template-qndm5-3632002577:
|
|
boundaryID: suspend-template-qndm5
|
|
children:
|
|
- suspend-template-qndm5-3456849218
|
|
displayName: '[1]'
|
|
finishedAt: null
|
|
id: suspend-template-qndm5-3632002577
|
|
name: suspend-template-qndm5[1]
|
|
phase: Running
|
|
startedAt: "2021-09-28T12:23:20Z"
|
|
templateScope: local/suspend-template-qndm5
|
|
type: StepGroup
|
|
phase: Running
|
|
progress: 1/1
|
|
resourcesDuration:
|
|
cpu: 6
|
|
memory: 3
|
|
startedAt: "2021-09-28T12:23:10Z"
|
|
`
|
|
|
|
func TestControllerRestartWithRunningWorkflow(t *testing.T) {
|
|
cancel, controller := newController()
|
|
defer cancel()
|
|
ctx := context.Background()
|
|
ctx = logging.WithLogger(ctx, logging.NewSlogLogger(logging.GetGlobalLevel(), logging.GetGlobalFormat()))
|
|
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")
|
|
wf := v1alpha1.MustUnmarshalWorkflow(suspendWfWithMetrics)
|
|
_, err := wfcset.Create(ctx, wf, metav1.CreateOptions{})
|
|
require.NoError(t, err)
|
|
woc := newWorkflowOperationCtx(ctx, wf, controller)
|
|
|
|
woc.operate(ctx)
|
|
attribs := attribute.NewSet(attribute.String("name", "model_a"))
|
|
_, err = testExporter.GetFloat64GaugeValue(woc.wf.Spec.Metrics.Prometheus[0].Name, &attribs)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
var runtimeWfMetrics = `apiVersion: argoproj.io/v1alpha1
|
|
kind: Workflow
|
|
metadata:
|
|
generateName: dag-task-
|
|
spec:
|
|
entrypoint: dag-task
|
|
metrics: # Custom metric workflow level
|
|
prometheus:
|
|
- name: playground_workflow_new
|
|
help: "Count of workflow execution by result status - workflow level"
|
|
labels:
|
|
- key: "playground_id_workflow_counter"
|
|
value: "test"
|
|
- key: status
|
|
value: "{{workflow.status}}"
|
|
counter:
|
|
value: "1"
|
|
templates:
|
|
- name: dag-task
|
|
dag:
|
|
tasks:
|
|
- name: TEST-ONE
|
|
template: echo
|
|
|
|
- name: echo
|
|
container:
|
|
image: alpine:3.7
|
|
command: [echo, "hello"]
|
|
`
|
|
|
|
func TestRuntimeMetrics(t *testing.T) {
|
|
cancel, controller := newController()
|
|
defer cancel()
|
|
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")
|
|
wf := v1alpha1.MustUnmarshalWorkflow(runtimeWfMetrics)
|
|
ctx := context.Background()
|
|
ctx = logging.WithLogger(ctx, logging.NewSlogLogger(logging.GetGlobalLevel(), logging.GetGlobalFormat()))
|
|
_, err := wfcset.Create(ctx, wf, metav1.CreateOptions{})
|
|
require.NoError(t, err)
|
|
woc := newWorkflowOperationCtx(ctx, wf, controller)
|
|
woc.operate(ctx) // create step node
|
|
|
|
makePodsPhase(ctx, woc, apiv1.PodSucceeded) // pod is successful - manually workflow is succeeded
|
|
woc = newWorkflowOperationCtx(ctx, woc.wf, controller)
|
|
woc.operate(ctx) // node status of previous context
|
|
|
|
attribs := attribute.NewSet(attribute.String("playground_id_workflow_counter", "test"), attribute.String("status", "Succeeded"))
|
|
value, err := testExporter.GetFloat64CounterValue(woc.wf.Spec.Metrics.Prometheus[0].Name, &attribs)
|
|
require.NoError(t, err)
|
|
assert.InDelta(t, float64(1), value, 0.001)
|
|
}
|