fix(sdk): visualizations and metrics do not work with data_passing_methods (#6882)

* Update _data_passing_using_volume.py

* Update _data_passing_using_volume.py

* Update _data_passing_using_volume.py

* Update _data_passing_using_volume.py

* Update _data_passing_using_volume.py

* Update artifact_passing_using_volume.yaml

* Update artifact_passing_using_volume.py

* Update _data_passing_using_volume.py
This commit is contained in:
juliusvonkohout 2021-11-16 02:33:12 +01:00 committed by GitHub
parent 9ee4534aef
commit b482ba83d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 215 additions and 88 deletions

View File

@ -128,7 +128,12 @@ def rewrite_data_passing_to_use_volumes(
'name': subpath_parameter_name, 'name': subpath_parameter_name,
'value': output_subpath, # Requires Argo 2.3.0+ 'value': output_subpath, # Requires Argo 2.3.0+
}) })
template.get('outputs', {}).pop('artifacts', None) whitelist = ['mlpipeline-ui-metadata', 'mlpipeline-metrics']
output_artifacts = [artifact for artifact in output_artifacts if artifact['name'] in whitelist]
if not output_artifacts:
template.get('outputs', {}).pop('artifacts', None)
else:
template.get('outputs', {}).update({'artifacts': output_artifacts})
# Rewrite DAG templates # Rewrite DAG templates
for template in templates: for template in templates:

View File

@ -1,7 +1,8 @@
from pathlib import Path from pathlib import Path
import kfp import kfp
from kfp.components import load_component_from_file from kfp.components import load_component_from_file, create_component_from_func
from typing import NamedTuple
test_data_dir = Path(__file__).parent / 'test_data' test_data_dir = Path(__file__).parent / 'test_data'
producer_op = load_component_from_file( producer_op = load_component_from_file(
@ -11,6 +12,33 @@ processor_op = load_component_from_file(
consumer_op = load_component_from_file( consumer_op = load_component_from_file(
str(test_data_dir / 'consume_2.component.yaml')) str(test_data_dir / 'consume_2.component.yaml'))
def metadata_and_metrics() -> NamedTuple(
"Outputs",
[("mlpipeline_ui_metadata", "UI_metadata"), ("mlpipeline_metrics", "Metrics")],
):
metadata = {
"outputs": [
{"storage": "inline", "source": "*this should be bold*", "type": "markdown"}
]
}
metrics = {
"metrics": [
{
"name": "train-accuracy",
"numberValue": 0.9,
},
{
"name": "test-accuracy",
"numberValue": 0.7,
},
]
}
from collections import namedtuple
import json
return namedtuple("output", ["mlpipeline_ui_metadata", "mlpipeline_metrics"])(
json.dumps(metadata), json.dumps(metrics)
)
@kfp.dsl.pipeline() @kfp.dsl.pipeline()
def artifact_passing_pipeline(): def artifact_passing_pipeline():
@ -20,6 +48,7 @@ def artifact_passing_pipeline():
consumer_task = consumer_op(processor_task.outputs['output_1'], consumer_task = consumer_op(processor_task.outputs['output_1'],
processor_task.outputs['output_2']) processor_task.outputs['output_2'])
markdown_task = create_component_from_func(func=metadata_and_metrics)()
# This line is only needed for compiling using dsl-compile to work # This line is only needed for compiling using dsl-compile to work
kfp.dsl.get_pipeline_conf( kfp.dsl.get_pipeline_conf(
).data_passing_method = volume_based_data_passing_method ).data_passing_method = volume_based_data_passing_method

View File

@ -1,83 +1,159 @@
apiVersion: argoproj.io/v1alpha1 apiVersion: argoproj.io/v1alpha1
kind: Workflow kind: Workflow
metadata: metadata:
annotations:
pipelines.kubeflow.org/pipeline_spec: '{"name": "artifact_passing_pipeline"}'
generateName: artifact-passing-pipeline- generateName: artifact-passing-pipeline-
annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.9, pipelines.kubeflow.org/pipeline_compilation_time: '2021-11-15T11:23:42.469722',
pipelines.kubeflow.org/pipeline_spec: '{"name": "Artifact passing pipeline"}'}
labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.9}
spec: spec:
arguments:
parameters: []
entrypoint: artifact-passing-pipeline entrypoint: artifact-passing-pipeline
serviceAccountName: pipeline-runner
templates: templates:
- name: artifact-passing-pipeline - name: artifact-passing-pipeline
dag: dag:
tasks: tasks:
- name: consumer - name: consumer
template: consumer template: consumer
dependencies: [processor]
arguments: arguments:
parameters: parameters:
- name: processor-Output-1 - {name: processor-Output-1, value: '{{tasks.processor.outputs.parameters.processor-Output-1}}'}
value: '{{tasks.processor.outputs.parameters.processor-Output-1}}' - {name: processor-Output-2-subpath, value: '{{tasks.processor.outputs.parameters.processor-Output-2-subpath}}'}
- name: processor-Output-2-subpath - {name: metadata-and-metrics, template: metadata-and-metrics}
value: '{{tasks.processor.outputs.parameters.processor-Output-2-subpath}}'
dependencies:
- processor
- name: processor - name: processor
template: processor template: processor
dependencies: [producer]
arguments: arguments:
parameters: parameters:
- name: producer-Output-1 - {name: producer-Output-1, value: '{{tasks.producer.outputs.parameters.producer-Output-1}}'}
value: '{{tasks.producer.outputs.parameters.producer-Output-1}}' - {name: producer-Output-2-subpath, value: '{{tasks.producer.outputs.parameters.producer-Output-2-subpath}}'}
- name: producer-Output-2-subpath - {name: producer, template: producer}
value: '{{tasks.producer.outputs.parameters.producer-Output-2-subpath}}'
dependencies:
- producer
- name: producer
template: producer
- name: consumer - name: consumer
metadata:
annotations:
pipelines.kubeflow.org/component_spec: '{"inputs": [{"name": "Input parameter"}, {"name": "Input artifact"}], "name": "Consumer"}'
inputs:
parameters:
- name: processor-Output-1
- name: processor-Output-2-subpath
container: container:
image: alpine args: ['{{inputs.parameters.processor-Output-1}}', /tmp/inputs/Input_artifact/data]
command: command:
- sh - sh
- -c - -c
- | - |
echo "Input parameter = $0" echo "Input parameter = $0"
echo "Input artifact = " && cat "$1" echo "Input artifact = " && cat "$1"
args: image: alpine
- '{{inputs.parameters.processor-Output-1}}'
- /tmp/inputs/Input_artifact/data
volumeMounts: volumeMounts:
- name: data-storage - {mountPath: /tmp/inputs/Input_artifact, name: data-storage, subPath: '{{inputs.parameters.processor-Output-2-subpath}}',
mountPath: /tmp/inputs/Input_artifact readOnly: true}
readOnly: true
subPath: '{{inputs.parameters.processor-Output-2-subpath}}'
- name: processor
metadata:
annotations:
pipelines.kubeflow.org/component_spec: '{"inputs": [{"name": "Input parameter"}, {"name": "Input artifact"}], "name": "Processor", "outputs": [{"name": "Output 1"}, {"name": "Output 2"}]}'
inputs: inputs:
parameters: parameters:
- name: producer-Output-1 - {name: processor-Output-1}
- name: producer-Output-2-subpath - {name: processor-Output-2-subpath}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.9
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
{"args": [{"inputValue": "Input parameter"}, {"inputPath": "Input artifact"}],
"command": ["sh", "-c", "echo \"Input parameter = $0\"\necho \"Input artifact
= \" && cat \"$1\"\n"], "image": "alpine"}}, "inputs": [{"name": "Input
parameter"}, {"name": "Input artifact"}], "name": "Consumer"}', pipelines.kubeflow.org/component_ref: '{"digest":
"1a8ea3c29c7853bf63d9b4fbd76a66b273621d2229c3cfe08ed68620ebf02982", "url":
"testdata/test_data/consume_2.component.yaml"}',
pipelines.kubeflow.org/arguments.parameters: '{"Input parameter": "{{inputs.parameters.processor-Output-1}}"}'}
- name: metadata-and-metrics
container:
args: ['----output-paths', /tmp/outputs/mlpipeline_ui_metadata/data, /tmp/outputs/mlpipeline_metrics/data]
command:
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def metadata_and_metrics():
metadata = {
"outputs": [
{"storage": "inline", "source": "*this should be bold*", "type": "markdown"}
]
}
metrics = {
"metrics": [
{
"name": "train-accuracy",
"numberValue": 0.9,
},
{
"name": "test-accuracy",
"numberValue": 0.7,
},
]
}
from collections import namedtuple
import json
return namedtuple("output", ["mlpipeline_ui_metadata", "mlpipeline_metrics"])(
json.dumps(metadata), json.dumps(metrics)
)
import argparse
_parser = argparse.ArgumentParser(prog='Metadata and metrics', description='')
_parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=2)
_parsed_args = vars(_parser.parse_args())
_output_files = _parsed_args.pop("_output_paths", [])
_outputs = metadata_and_metrics(**_parsed_args)
_output_serializers = [
str,
str,
]
import os
for idx, output_file in enumerate(_output_files):
try:
os.makedirs(os.path.dirname(output_file))
except OSError:
pass
with open(output_file, 'w') as f:
f.write(_output_serializers[idx](_outputs[idx]))
image: python:3.7
volumeMounts:
- {mountPath: /tmp/outputs/mlpipeline_ui_metadata, name: data-storage, subPath: 'artifact_data/{{workflow.uid}}_{{pod.name}}/mlpipeline-ui-metadata'}
- {mountPath: /tmp/outputs/mlpipeline_metrics, name: data-storage, subPath: 'artifact_data/{{workflow.uid}}_{{pod.name}}/mlpipeline-metrics'}
outputs: outputs:
parameters: parameters:
- name: processor-Output-1 - {name: mlpipeline-ui-metadata-subpath, value: 'artifact_data/{{workflow.uid}}_{{pod.name}}/mlpipeline-ui-metadata'}
valueFrom: - {name: mlpipeline-metrics-subpath, value: 'artifact_data/{{workflow.uid}}_{{pod.name}}/mlpipeline-metrics'}
path: /tmp/outputs/Output_1/data artifacts:
- name: processor-Output-1-subpath - {name: mlpipeline-ui-metadata, path: /tmp/outputs/mlpipeline_ui_metadata/data}
value: artifact_data/{{workflow.uid}}_{{pod.name}}/processor-Output-1 - {name: mlpipeline-metrics, path: /tmp/outputs/mlpipeline_metrics/data}
- name: processor-Output-2-subpath metadata:
value: artifact_data/{{workflow.uid}}_{{pod.name}}/processor-Output-2 labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.9
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
{"args": ["----output-paths", {"outputPath": "mlpipeline_ui_metadata"},
{"outputPath": "mlpipeline_metrics"}], "command": ["sh", "-ec", "program_path=$(mktemp)\nprintf
\"%s\" \"$0\" > \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n",
"def metadata_and_metrics():\n metadata = {\n \"outputs\": [\n {\"storage\":
\"inline\", \"source\": \"*this should be bold*\", \"type\": \"markdown\"}\n ]\n }\n metrics
= {\n \"metrics\": [\n {\n \"name\": \"train-accuracy\",\n \"numberValue\":
0.9,\n },\n {\n \"name\": \"test-accuracy\",\n \"numberValue\":
0.7,\n },\n ]\n }\n from collections import namedtuple\n import
json\n\n return namedtuple(\"output\", [\"mlpipeline_ui_metadata\", \"mlpipeline_metrics\"])(\n json.dumps(metadata),
json.dumps(metrics)\n )\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Metadata
and metrics'', description='''')\n_parser.add_argument(\"----output-paths\",
dest=\"_output_paths\", type=str, nargs=2)\n_parsed_args = vars(_parser.parse_args())\n_output_files
= _parsed_args.pop(\"_output_paths\", [])\n\n_outputs = metadata_and_metrics(**_parsed_args)\n\n_output_serializers
= [\n str,\n str,\n\n]\n\nimport os\nfor idx, output_file in enumerate(_output_files):\n try:\n os.makedirs(os.path.dirname(output_file))\n except
OSError:\n pass\n with open(output_file, ''w'') as f:\n f.write(_output_serializers[idx](_outputs[idx]))\n"],
"image": "python:3.7"}}, "name": "Metadata and metrics", "outputs": [{"name":
"mlpipeline_ui_metadata", "type": "UI_metadata"}, {"name": "mlpipeline_metrics",
"type": "Metrics"}]}', pipelines.kubeflow.org/component_ref: '{}'}
- name: processor
container: container:
image: alpine args: ['{{inputs.parameters.producer-Output-1}}', /tmp/inputs/Input_artifact/data,
/tmp/outputs/Output_1/data, /tmp/outputs/Output_2/data]
command: command:
- sh - sh
- -c - -c
@ -86,37 +162,40 @@ spec:
mkdir -p "$(dirname "$3")" mkdir -p "$(dirname "$3")"
echo "$0" > "$2" echo "$0" > "$2"
cp "$1" "$3" cp "$1" "$3"
args: image: alpine
- '{{inputs.parameters.producer-Output-1}}'
- /tmp/inputs/Input_artifact/data
- /tmp/outputs/Output_1/data
- /tmp/outputs/Output_2/data
volumeMounts: volumeMounts:
- mountPath: /tmp/inputs/Input_artifact - {mountPath: /tmp/inputs/Input_artifact, name: data-storage, subPath: '{{inputs.parameters.producer-Output-2-subpath}}',
name: data-storage readOnly: true}
readOnly: true - {mountPath: /tmp/outputs/Output_1, name: data-storage, subPath: 'artifact_data/{{workflow.uid}}_{{pod.name}}/processor-Output-1'}
subPath: '{{inputs.parameters.producer-Output-2-subpath}}' - {mountPath: /tmp/outputs/Output_2, name: data-storage, subPath: 'artifact_data/{{workflow.uid}}_{{pod.name}}/processor-Output-2'}
- mountPath: /tmp/outputs/Output_1 inputs:
name: data-storage parameters:
subPath: artifact_data/{{workflow.uid}}_{{pod.name}}/processor-Output-1 - {name: producer-Output-1}
- mountPath: /tmp/outputs/Output_2 - {name: producer-Output-2-subpath}
name: data-storage
subPath: artifact_data/{{workflow.uid}}_{{pod.name}}/processor-Output-2
- name: producer
metadata:
annotations:
pipelines.kubeflow.org/component_spec: '{"name": "Producer", "outputs": [{"name": "Output 1"}, {"name": "Output 2"}]}'
outputs: outputs:
parameters: parameters:
- name: producer-Output-1 - name: processor-Output-1
valueFrom: valueFrom: {path: /tmp/outputs/Output_1/data}
path: /tmp/outputs/Output_1/data - {name: processor-Output-1-subpath, value: 'artifact_data/{{workflow.uid}}_{{pod.name}}/processor-Output-1'}
- name: producer-Output-1-subpath - {name: processor-Output-2-subpath, value: 'artifact_data/{{workflow.uid}}_{{pod.name}}/processor-Output-2'}
value: artifact_data/{{workflow.uid}}_{{pod.name}}/producer-Output-1 metadata:
- name: producer-Output-2-subpath labels:
value: artifact_data/{{workflow.uid}}_{{pod.name}}/producer-Output-2 pipelines.kubeflow.org/kfp_sdk_version: 1.8.9
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
{"args": [{"inputValue": "Input parameter"}, {"inputPath": "Input artifact"},
{"outputPath": "Output 1"}, {"outputPath": "Output 2"}], "command": ["sh",
"-c", "mkdir -p \"$(dirname \"$2\")\"\nmkdir -p \"$(dirname \"$3\")\"\necho
\"$0\" > \"$2\"\ncp \"$1\" \"$3\"\n"], "image": "alpine"}}, "inputs": [{"name":
"Input parameter"}, {"name": "Input artifact"}], "name": "Processor", "outputs":
[{"name": "Output 1"}, {"name": "Output 2"}]}', pipelines.kubeflow.org/component_ref: '{"digest":
"f11a277f5c5cbc27a2e2cda412547b607671d88a4e7aa8a1665dadb836b592b3", "url":
"testdata/test_data/process_2_2.component.yaml"}',
pipelines.kubeflow.org/arguments.parameters: '{"Input parameter": "{{inputs.parameters.producer-Output-1}}"}'}
- name: producer
container: container:
image: alpine args: [/tmp/outputs/Output_1/data, /tmp/outputs/Output_2/data]
command: command:
- sh - sh
- -c - -c
@ -125,17 +204,31 @@ spec:
mkdir -p "$(dirname "$1")" mkdir -p "$(dirname "$1")"
echo "Data 1" > $0 echo "Data 1" > $0
echo "Data 2" > $1 echo "Data 2" > $1
args: image: alpine
- /tmp/outputs/Output_1/data
- /tmp/outputs/Output_2/data
volumeMounts: volumeMounts:
- mountPath: /tmp/outputs/Output_1 - {mountPath: /tmp/outputs/Output_1, name: data-storage, subPath: 'artifact_data/{{workflow.uid}}_{{pod.name}}/producer-Output-1'}
name: data-storage - {mountPath: /tmp/outputs/Output_2, name: data-storage, subPath: 'artifact_data/{{workflow.uid}}_{{pod.name}}/producer-Output-2'}
subPath: artifact_data/{{workflow.uid}}_{{pod.name}}/producer-Output-1 outputs:
- mountPath: /tmp/outputs/Output_2 parameters:
name: data-storage - name: producer-Output-1
subPath: artifact_data/{{workflow.uid}}_{{pod.name}}/producer-Output-2 valueFrom: {path: /tmp/outputs/Output_1/data}
- {name: producer-Output-1-subpath, value: 'artifact_data/{{workflow.uid}}_{{pod.name}}/producer-Output-1'}
- {name: producer-Output-2-subpath, value: 'artifact_data/{{workflow.uid}}_{{pod.name}}/producer-Output-2'}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.9
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
{"args": [{"outputPath": "Output 1"}, {"outputPath": "Output 2"}], "command":
["sh", "-c", "mkdir -p \"$(dirname \"$0\")\"\nmkdir -p \"$(dirname \"$1\")\"\necho
\"Data 1\" > $0\necho \"Data 2\" > $1\n"], "image": "alpine"}}, "name":
"Producer", "outputs": [{"name": "Output 1"}, {"name": "Output 2"}]}', pipelines.kubeflow.org/component_ref: '{"digest":
"7399eb54ee94a95708fa9f8a47330a39258b22a319a48458e14a63dcedb87ea4", "url":
"testdata/test_data/produce_2.component.yaml"}'}
arguments:
parameters: []
serviceAccountName: pipeline-runner
volumes: volumes:
- name: data-storage - name: data-storage
persistentVolumeClaim: persistentVolumeClaim: {claimName: data-volume}
claimName: data-volume