177 lines
6.8 KiB
YAML
177 lines
6.8 KiB
YAML
# PIPELINE DEFINITION
|
|
# Name: preprocess
|
|
# Description: Dummy preprocessing step.
|
|
# Inputs:
|
|
# input_dict_parameter: dict
|
|
# input_list_parameter: list
|
|
# message: str
|
|
# Outputs:
|
|
# output_bool_parameter_path: bool
|
|
# output_dataset_one: system.Dataset
|
|
# output_dataset_two_path: system.Dataset
|
|
# output_dict_parameter_path: dict
|
|
# output_list_parameter_path: list
|
|
# output_parameter_path: str
|
|
components:
|
|
comp-preprocess:
|
|
executorLabel: exec-preprocess
|
|
inputDefinitions:
|
|
parameters:
|
|
input_dict_parameter:
|
|
parameterType: STRUCT
|
|
input_list_parameter:
|
|
parameterType: LIST
|
|
message:
|
|
parameterType: STRING
|
|
outputDefinitions:
|
|
artifacts:
|
|
output_dataset_one:
|
|
artifactType:
|
|
schemaTitle: system.Dataset
|
|
schemaVersion: 0.0.1
|
|
output_dataset_two_path:
|
|
artifactType:
|
|
schemaTitle: system.Dataset
|
|
schemaVersion: 0.0.1
|
|
parameters:
|
|
output_bool_parameter_path:
|
|
parameterType: BOOLEAN
|
|
output_dict_parameter_path:
|
|
parameterType: STRUCT
|
|
output_list_parameter_path:
|
|
parameterType: LIST
|
|
output_parameter_path:
|
|
parameterType: STRING
|
|
deploymentSpec:
|
|
executors:
|
|
exec-preprocess:
|
|
container:
|
|
args:
|
|
- --executor_input
|
|
- '{{$}}'
|
|
- --function_to_execute
|
|
- preprocess
|
|
command:
|
|
- sh
|
|
- -c
|
|
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
|
|
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
|
|
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\
|
|
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
|
|
$0\" \"$@\"\n"
|
|
- sh
|
|
- -ec
|
|
- 'program_path=$(mktemp -d)
|
|
|
|
|
|
printf "%s" "$0" > "$program_path/ephemeral_component.py"
|
|
|
|
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
|
|
|
|
'
|
|
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
|
|
\ *\n\ndef preprocess(\n # An input parameter of type string.\n message:\
|
|
\ str,\n # An input parameter of type dict.\n input_dict_parameter:\
|
|
\ Dict[str, int],\n # An input parameter of type list.\n input_list_parameter:\
|
|
\ List[str],\n # Use Output[T] to get a metadata-rich handle to the output\
|
|
\ artifact\n # of type `Dataset`.\n output_dataset_one: Output[Dataset],\n\
|
|
\ # A locally accessible filepath for another output artifact of type\n\
|
|
\ # `Dataset`.\n output_dataset_two_path: OutputPath('Dataset'),\n\
|
|
\ # A locally accessible filepath for an output parameter of type string.\n\
|
|
\ output_parameter_path: OutputPath(str),\n # A locally accessible\
|
|
\ filepath for an output parameter of type bool.\n output_bool_parameter_path:\
|
|
\ OutputPath(bool),\n # A locally accessible filepath for an output parameter\
|
|
\ of type dict.\n output_dict_parameter_path: OutputPath(Dict[str, int]),\n\
|
|
\ # A locally accessible filepath for an output parameter of type list.\n\
|
|
\ output_list_parameter_path: OutputPath(List[str]),\n):\n \"\"\"\
|
|
Dummy preprocessing step.\"\"\"\n\n # Use Dataset.path to access a local\
|
|
\ file path for writing.\n # One can also use Dataset.uri to access the\
|
|
\ actual URI file path.\n with open(output_dataset_one.path, 'w') as\
|
|
\ f:\n f.write(message)\n\n # OutputPath is used to just pass\
|
|
\ the local file path of the output artifact\n # to the function.\n \
|
|
\ with open(output_dataset_two_path, 'w') as f:\n f.write(message)\n\
|
|
\n with open(output_parameter_path, 'w') as f:\n f.write(message)\n\
|
|
\n with open(output_bool_parameter_path, 'w') as f:\n f.write(\n\
|
|
\ str(True)) # use either `str()` or `json.dumps()` for bool\
|
|
\ values.\n\n import json\n with open(output_dict_parameter_path,\
|
|
\ 'w') as f:\n f.write(json.dumps(input_dict_parameter))\n\n with\
|
|
\ open(output_list_parameter_path, 'w') as f:\n f.write(json.dumps(input_list_parameter))\n\
|
|
\n"
|
|
image: python:3.7
|
|
pipelineInfo:
|
|
name: preprocess
|
|
root:
|
|
dag:
|
|
outputs:
|
|
artifacts:
|
|
output_dataset_one:
|
|
artifactSelectors:
|
|
- outputArtifactKey: output_dataset_one
|
|
producerSubtask: preprocess
|
|
output_dataset_two_path:
|
|
artifactSelectors:
|
|
- outputArtifactKey: output_dataset_two_path
|
|
producerSubtask: preprocess
|
|
parameters:
|
|
output_bool_parameter_path:
|
|
valueFromParameter:
|
|
outputParameterKey: output_bool_parameter_path
|
|
producerSubtask: preprocess
|
|
output_dict_parameter_path:
|
|
valueFromParameter:
|
|
outputParameterKey: output_dict_parameter_path
|
|
producerSubtask: preprocess
|
|
output_list_parameter_path:
|
|
valueFromParameter:
|
|
outputParameterKey: output_list_parameter_path
|
|
producerSubtask: preprocess
|
|
output_parameter_path:
|
|
valueFromParameter:
|
|
outputParameterKey: output_parameter_path
|
|
producerSubtask: preprocess
|
|
tasks:
|
|
preprocess:
|
|
cachingOptions:
|
|
enableCache: true
|
|
componentRef:
|
|
name: comp-preprocess
|
|
inputs:
|
|
parameters:
|
|
input_dict_parameter:
|
|
componentInputParameter: input_dict_parameter
|
|
input_list_parameter:
|
|
componentInputParameter: input_list_parameter
|
|
message:
|
|
componentInputParameter: message
|
|
taskInfo:
|
|
name: preprocess
|
|
inputDefinitions:
|
|
parameters:
|
|
input_dict_parameter:
|
|
parameterType: STRUCT
|
|
input_list_parameter:
|
|
parameterType: LIST
|
|
message:
|
|
parameterType: STRING
|
|
outputDefinitions:
|
|
artifacts:
|
|
output_dataset_one:
|
|
artifactType:
|
|
schemaTitle: system.Dataset
|
|
schemaVersion: 0.0.1
|
|
output_dataset_two_path:
|
|
artifactType:
|
|
schemaTitle: system.Dataset
|
|
schemaVersion: 0.0.1
|
|
parameters:
|
|
output_bool_parameter_path:
|
|
parameterType: BOOLEAN
|
|
output_dict_parameter_path:
|
|
parameterType: STRUCT
|
|
output_list_parameter_path:
|
|
parameterType: LIST
|
|
output_parameter_path:
|
|
parameterType: STRING
|
|
schemaVersion: 2.1.0
|
|
sdkVersion: kfp-2.1.3
|