# PIPELINE DEFINITION # Name: preprocess # Description: Dummy preprocessing step. # Inputs: # input_dict_parameter: dict # input_list_parameter: list # message: str # Outputs: # output_bool_parameter_path: bool # output_dataset_one: system.Dataset # output_dataset_two_path: system.Dataset # output_dict_parameter_path: dict # output_list_parameter_path: list # output_parameter_path: str components: comp-preprocess: executorLabel: exec-preprocess inputDefinitions: parameters: input_dict_parameter: parameterType: STRUCT input_list_parameter: parameterType: LIST message: parameterType: STRING outputDefinitions: artifacts: output_dataset_one: artifactType: schemaTitle: system.Dataset schemaVersion: 0.0.1 output_dataset_two_path: artifactType: schemaTitle: system.Dataset schemaVersion: 0.0.1 parameters: output_bool_parameter_path: parameterType: BOOLEAN output_dict_parameter_path: parameterType: STRUCT output_list_parameter_path: parameterType: LIST output_parameter_path: parameterType: STRING deploymentSpec: executors: exec-preprocess: container: args: - --executor_input - '{{$}}' - --function_to_execute - preprocess command: - sh - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ $0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) printf "%s" "$0" > "$program_path/ephemeral_component.py" _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef preprocess(\n # An input parameter of type string.\n message:\ \ str,\n # An input parameter of type dict.\n input_dict_parameter:\ \ Dict[str, int],\n # An input parameter of type list.\n input_list_parameter:\ \ List[str],\n # Use Output[T] to get a metadata-rich handle to the output\ \ artifact\n # of type `Dataset`.\n output_dataset_one: Output[Dataset],\n\ \ # A locally accessible filepath for another output artifact of type\n\ \ # `Dataset`.\n output_dataset_two_path: OutputPath('Dataset'),\n\ \ # A locally accessible filepath for an output parameter of type string.\n\ \ output_parameter_path: OutputPath(str),\n # A locally accessible\ \ filepath for an output parameter of type bool.\n output_bool_parameter_path:\ \ OutputPath(bool),\n # A locally accessible filepath for an output parameter\ \ of type dict.\n output_dict_parameter_path: OutputPath(Dict[str, int]),\n\ \ # A locally accessible filepath for an output parameter of type list.\n\ \ output_list_parameter_path: OutputPath(List[str]),\n):\n \"\"\"\ Dummy preprocessing step.\"\"\"\n\n # Use Dataset.path to access a local\ \ file path for writing.\n # One can also use Dataset.uri to access the\ \ actual URI file path.\n with open(output_dataset_one.path, 'w') as\ \ f:\n f.write(message)\n\n # OutputPath is used to just pass\ \ the local file path of the output artifact\n # to the function.\n \ \ with open(output_dataset_two_path, 'w') as f:\n f.write(message)\n\ \n with open(output_parameter_path, 'w') as f:\n f.write(message)\n\ \n with open(output_bool_parameter_path, 'w') as f:\n f.write(\n\ \ str(True)) # use either `str()` or `json.dumps()` for bool\ \ values.\n\n import json\n with open(output_dict_parameter_path,\ \ 'w') as f:\n f.write(json.dumps(input_dict_parameter))\n\n with\ \ open(output_list_parameter_path, 'w') as f:\n f.write(json.dumps(input_list_parameter))\n\ \n" image: python:3.7 pipelineInfo: name: preprocess root: dag: outputs: artifacts: output_dataset_one: artifactSelectors: - outputArtifactKey: output_dataset_one producerSubtask: preprocess output_dataset_two_path: artifactSelectors: - outputArtifactKey: output_dataset_two_path producerSubtask: preprocess parameters: output_bool_parameter_path: valueFromParameter: outputParameterKey: output_bool_parameter_path producerSubtask: preprocess output_dict_parameter_path: valueFromParameter: outputParameterKey: output_dict_parameter_path producerSubtask: preprocess output_list_parameter_path: valueFromParameter: outputParameterKey: output_list_parameter_path producerSubtask: preprocess output_parameter_path: valueFromParameter: outputParameterKey: output_parameter_path producerSubtask: preprocess tasks: preprocess: cachingOptions: enableCache: true componentRef: name: comp-preprocess inputs: parameters: input_dict_parameter: componentInputParameter: input_dict_parameter input_list_parameter: componentInputParameter: input_list_parameter message: componentInputParameter: message taskInfo: name: preprocess inputDefinitions: parameters: input_dict_parameter: parameterType: STRUCT input_list_parameter: parameterType: LIST message: parameterType: STRING outputDefinitions: artifacts: output_dataset_one: artifactType: schemaTitle: system.Dataset schemaVersion: 0.0.1 output_dataset_two_path: artifactType: schemaTitle: system.Dataset schemaVersion: 0.0.1 parameters: output_bool_parameter_path: parameterType: BOOLEAN output_dict_parameter_path: parameterType: STRUCT output_list_parameter_path: parameterType: LIST output_parameter_path: parameterType: STRING schemaVersion: 2.1.0 sdkVersion: kfp-2.1.3