feat(sdk): support extracting input/output descriptions from component/pipeline docstrings (#9156)

* feat(sdk): support extracting input/output descriptions from component/pipeline docstrings

* remove numpydoc tests
This commit is contained in:
Connor McCarthy 2023-04-24 10:16:44 -07:00 committed by GitHub
parent 7c0de99a9c
commit 946c51bafe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 805 additions and 1 deletions

View File

@ -2,6 +2,7 @@
## Features
* Support `display_name` and `description` in `@dsl.pipeline` decorator [\#9153](https://github.com/kubeflow/pipelines/pull/9153)
* Extract component input and output descriptions from docstring [\#9156](https://github.com/kubeflow/pipelines/pull/9156)
## Breaking changes

View File

@ -3785,5 +3785,292 @@ class TestPlatformConfig(unittest.TestCase):
foo_platform_set_bar_feature(task, 12)
class ExtractInputOutputDescription(unittest.TestCase):
def test_no_descriptions(self):
from kfp import dsl
@dsl.component
def comp(
string: str,
in_artifact: Input[Artifact],
out_artifact: Output[Artifact],
) -> str:
return string
Outputs = NamedTuple(
'Outputs',
out_str=str,
out_artifact=Artifact,
)
@dsl.pipeline
def my_pipeline(
string: str,
in_artifact: Input[Artifact],
) -> Outputs:
t = comp(
string=string,
in_artifact=in_artifact,
)
return Outputs(
out_str=t.outputs['Output'],
out_artifact=t.outputs['out_artifact'])
pipeline_spec = my_pipeline.pipeline_spec
# test pipeline
# check key with assertIn first to prevent false negatives with errored key and easier debugging
self.assertIn('string', pipeline_spec.root.input_definitions.parameters)
self.assertEqual(
pipeline_spec.root.input_definitions.parameters['string']
.description, '')
self.assertIn('in_artifact',
pipeline_spec.root.input_definitions.artifacts)
self.assertEqual(
pipeline_spec.root.input_definitions.artifacts['in_artifact']
.description, '')
self.assertIn('out_str',
pipeline_spec.root.output_definitions.parameters)
self.assertEqual(
pipeline_spec.root.output_definitions.parameters['out_str']
.description, '')
self.assertIn('out_artifact',
pipeline_spec.root.output_definitions.artifacts)
self.assertEqual(
pipeline_spec.root.output_definitions.artifacts['out_artifact']
.description, '')
# test component
# check key with assertIn first to prevent false negatives with errored key and easier debugging
self.assertIn(
'string',
pipeline_spec.components['comp-comp'].input_definitions.parameters)
self.assertEqual(
pipeline_spec.components['comp-comp'].input_definitions
.parameters['string'].description, '')
self.assertIn(
'in_artifact',
pipeline_spec.components['comp-comp'].input_definitions.artifacts)
self.assertEqual(
pipeline_spec.components['comp-comp'].input_definitions
.artifacts['in_artifact'].description, '')
self.assertIn(
'Output',
pipeline_spec.components['comp-comp'].output_definitions.parameters)
self.assertEqual(
pipeline_spec.components['comp-comp'].output_definitions
.parameters['Output'].description, '')
self.assertIn(
'out_artifact',
pipeline_spec.components['comp-comp'].output_definitions.artifacts)
self.assertEqual(
pipeline_spec.components['comp-comp'].output_definitions
.artifacts['out_artifact'].description, '')
def test_google_style(self):
@dsl.component
def comp(
string: str,
in_artifact: Input[Artifact],
out_artifact: Output[Artifact],
) -> str:
"""Component description.
Args:
string: Component input string.
in_artifact: Component input artifact.
Returns:
Output: Component output string.
out_artifact: Component output artifact.
"""
return string
Outputs = NamedTuple(
'Outputs',
out_str=str,
out_artifact=Artifact,
)
@dsl.pipeline
def my_pipeline(
string: str,
in_artifact: Input[Artifact],
) -> Outputs:
"""Pipeline description.
Args:
string: Pipeline input string.
in_artifact: Pipeline input artifact.
Returns:
out_str: Pipeline output string.
out_artifact: Pipeline output artifact.
"""
t = comp(
string=string,
in_artifact=in_artifact,
)
return Outputs(
out_str=t.outputs['Output'],
out_artifact=t.outputs['out_artifact'])
pipeline_spec = my_pipeline.pipeline_spec
# test pipeline
# check key with assertIn first to prevent false negatives with errored key and easier debugging
self.assertIn('string', pipeline_spec.root.input_definitions.parameters)
self.assertEqual(
pipeline_spec.root.input_definitions.parameters['string']
.description, 'Pipeline input string.')
self.assertIn('in_artifact',
pipeline_spec.root.input_definitions.artifacts)
self.assertEqual(
pipeline_spec.root.input_definitions.artifacts['in_artifact']
.description, 'Pipeline input artifact.')
self.assertIn('out_str',
pipeline_spec.root.output_definitions.parameters)
self.assertEqual(
pipeline_spec.root.output_definitions.parameters['out_str']
.description, 'Pipeline output string.')
self.assertIn('out_artifact',
pipeline_spec.root.output_definitions.artifacts)
self.assertEqual(
pipeline_spec.root.output_definitions.artifacts['out_artifact']
.description, 'Pipeline output artifact.')
# test component
# check key with assertIn first to prevent false negatives with errored key and easier debugging
self.assertIn(
'string',
pipeline_spec.components['comp-comp'].input_definitions.parameters)
self.assertEqual(
pipeline_spec.components['comp-comp'].input_definitions
.parameters['string'].description, 'Component input string.')
self.assertIn(
'in_artifact',
pipeline_spec.components['comp-comp'].input_definitions.artifacts)
self.assertEqual(
pipeline_spec.components['comp-comp'].input_definitions
.artifacts['in_artifact'].description, 'Component input artifact.')
self.assertIn(
'Output',
pipeline_spec.components['comp-comp'].output_definitions.parameters)
self.assertEqual(
pipeline_spec.components['comp-comp'].output_definitions
.parameters['Output'].description, 'Component output string.')
self.assertIn(
'out_artifact',
pipeline_spec.components['comp-comp'].output_definitions.artifacts)
self.assertEqual(
pipeline_spec.components['comp-comp'].output_definitions
.artifacts['out_artifact'].description,
'Component output artifact.')
def test_inner_return_keywords_does_not_mess_up_extraction(self):
# we do string replacement for Return and Returns, so need to ensure having those words elsewhere plays well with extraction
@dsl.component
def comp(
string: str,
in_artifact: Input[Artifact],
out_artifact: Output[Artifact],
) -> str:
"""Return Component Returns description.
Args:
string: Component Return input string.
in_artifact: Component Returns input artifact.
Returns:
Output: Component output string.
out_artifact: Component output artifact.
"""
return string
Outputs = NamedTuple(
'Outputs',
out_str=str,
out_artifact=Artifact,
)
@dsl.pipeline
def my_pipeline(
string: str,
in_artifact: Input[Artifact],
) -> Outputs:
"""Pipeline description. Returns
Args:
string: Return Pipeline input string. Returns
in_artifact: Pipeline input Return artifact.
Returns:
out_str: Pipeline output string.
out_artifact: Pipeline output artifact.
"""
t = comp(
string=string,
in_artifact=in_artifact,
)
return Outputs(
out_str=t.outputs['Output'],
out_artifact=t.outputs['out_artifact'])
pipeline_spec = my_pipeline.pipeline_spec
# test pipeline
# check key with assertIn first to prevent false negatives with errored key and easier debugging
self.assertIn('string', pipeline_spec.root.input_definitions.parameters)
self.assertEqual(
pipeline_spec.root.input_definitions.parameters['string']
.description, 'Return Pipeline input string. Returns')
self.assertIn('in_artifact',
pipeline_spec.root.input_definitions.artifacts)
self.assertEqual(
pipeline_spec.root.input_definitions.artifacts['in_artifact']
.description, 'Pipeline input Return artifact.')
self.assertIn('out_str',
pipeline_spec.root.output_definitions.parameters)
self.assertEqual(
pipeline_spec.root.output_definitions.parameters['out_str']
.description, 'Pipeline output string.')
self.assertIn('out_artifact',
pipeline_spec.root.output_definitions.artifacts)
self.assertEqual(
pipeline_spec.root.output_definitions.artifacts['out_artifact']
.description, 'Pipeline output artifact.')
# test component
# check key with assertIn first to prevent false negatives with errored key and easier debugging
self.assertIn(
'string',
pipeline_spec.components['comp-comp'].input_definitions.parameters)
self.assertEqual(
pipeline_spec.components['comp-comp'].input_definitions
.parameters['string'].description, 'Component Return input string.')
self.assertIn(
'in_artifact',
pipeline_spec.components['comp-comp'].input_definitions.artifacts)
self.assertEqual(
pipeline_spec.components['comp-comp'].input_definitions
.artifacts['in_artifact'].description,
'Component Returns input artifact.')
self.assertIn(
'Output',
pipeline_spec.components['comp-comp'].output_definitions.parameters)
self.assertEqual(
pipeline_spec.components['comp-comp'].output_definitions
.parameters['Output'].description, 'Component output string.')
self.assertIn(
'out_artifact',
pipeline_spec.components['comp-comp'].output_definitions.artifacts)
self.assertEqual(
pipeline_spec.components['comp-comp'].output_definitions
.artifacts['out_artifact'].description,
'Component output artifact.')
if __name__ == '__main__':
unittest.main()

View File

@ -362,6 +362,9 @@ def _build_component_spec_from_component_spec_structure(
input_name].parameter_type = pipeline_spec_pb2.ParameterType.STRUCT
component_spec.input_definitions.parameters[
input_name].is_optional = True
if input_spec.description:
component_spec.input_definitions.parameters[
input_name].description = input_spec.description
elif type_utils.is_parameter_type(input_spec.type):
component_spec.input_definitions.parameters[
@ -375,6 +378,9 @@ def _build_component_spec_from_component_spec_structure(
input_name=input_name,
default_value=input_spec.default,
)
if input_spec.description:
component_spec.input_definitions.parameters[
input_name].description = input_spec.description
else:
component_spec.input_definitions.artifacts[
@ -386,6 +392,9 @@ def _build_component_spec_from_component_spec_structure(
if input_spec.optional:
component_spec.input_definitions.artifacts[
input_name].is_optional = True
if input_spec.description:
component_spec.input_definitions.artifacts[
input_name].description = input_spec.description
for output_name, output_spec in (component_spec_struct.outputs or
{}).items():
@ -393,6 +402,9 @@ def _build_component_spec_from_component_spec_structure(
component_spec.output_definitions.parameters[
output_name].parameter_type = type_utils.get_parameter_type(
output_spec.type)
if output_spec.description:
component_spec.output_definitions.parameters[
output_name].description = output_spec.description
else:
component_spec.output_definitions.artifacts[
output_name].artifact_type.CopyFrom(
@ -400,6 +412,9 @@ def _build_component_spec_from_component_spec_structure(
output_spec.type))
component_spec.output_definitions.artifacts[
output_name].is_artifact_list = output_spec.is_artifact_list
if output_spec.description:
component_spec.output_definitions.artifacts[
output_name].description = output_spec.description
return component_spec

View File

@ -17,7 +17,7 @@ import itertools
import pathlib
import re
import textwrap
from typing import Callable, List, Optional, Tuple, Type, Union
from typing import Callable, List, Mapping, Optional, Tuple, Type, Union
import warnings
import docstring_parser
@ -349,6 +349,43 @@ def extract_component_interface(
component_name = name or _python_function_name_to_component_name(
func.__name__)
def assign_descriptions(
inputs_or_outputs: Mapping[str, Union[structures.InputSpec,
structures.OutputSpec]],
docstring_params: List[docstring_parser.DocstringParam],
) -> None:
"""Assigns descriptions to InputSpec or OutputSpec for each component
input/output found in the parsed docstring parameters."""
docstring_inputs = {param.arg_name: param for param in docstring_params}
for name, spec in inputs_or_outputs.items():
if name in docstring_inputs:
spec.description = docstring_inputs[name].description
def parse_docstring_with_return_as_args(
docstring: Union[str,
None]) -> Union[docstring_parser.Docstring, None]:
"""Modifies docstring so that a return section can be treated as an
args section, then parses the docstring."""
if docstring is None:
return None
# Returns and Return are the only two keywords docstring_parser uses for returns
# use newline to avoid replacements that aren't in the return section header
return_keywords = ['Returns:\n', 'Returns\n', 'Return:\n', 'Return\n']
for keyword in return_keywords:
if keyword in docstring:
modified_docstring = docstring.replace(keyword.strip(), 'Args:')
return docstring_parser.parse(modified_docstring)
return None
assign_descriptions(inputs, parsed_docstring.params)
modified_parsed_docstring = parse_docstring_with_return_as_args(
original_docstring)
if modified_parsed_docstring is not None:
assign_descriptions(outputs, modified_parsed_docstring.params)
description = get_pipeline_description(
decorator_description=description,
docstring=parsed_docstring,

View File

@ -45,12 +45,14 @@ class InputSpec:
default (optional): the default value for the input.
optional: Wether the input is optional. An input is optional when it has an explicit default value.
is_artifact_list: True if `type` represents a list of the artifact type. Only applies when `type` is an artifact.
description: Input description.
"""
type: Union[str, dict]
default: Optional[Any] = None
optional: bool = False
# This special flag for lists of artifacts allows type to be used the same way for list of artifacts and single artifacts. This is aligned with how IR represents lists of artifacts (same as for single artifacts), as well as simplifies downstream type handling/checking operations in the SDK since we don't need to parse the string `type` to determine if single artifact or list.
is_artifact_list: bool = False
description: Optional[str] = None
def __post_init__(self) -> None:
self._validate_type()
@ -146,10 +148,12 @@ class OutputSpec:
Attributes:
type: The type of the output.
is_artifact_list: True if `type` represents a list of the artifact type. Only applies when `type` is an artifact.
description: Output description.
"""
type: Union[str, dict]
# This special flag for lists of artifacts allows type to be used the same way for list of artifacts and single artifacts. This is aligned with how IR represents lists of artifacts (same as for single artifacts), as well as simplifies downstream type handling/checking operations in the SDK since we don't need to parse the string `type` to determine if single artifact or list.
is_artifact_list: bool = False
description: Optional[str] = None
def __post_init__(self) -> None:
self._validate_type()

View File

@ -0,0 +1,56 @@
# Copyright 2023 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from kfp import compiler
from kfp import dsl
from kfp.dsl import Dataset
from kfp.dsl import Input
from kfp.dsl import Output
@dsl.component
def dataset_joiner(
dataset_a: Input[Dataset],
dataset_b: Input[Dataset],
out_dataset: Output[Dataset],
) -> str:
"""Concatenate dataset_a and dataset_b.
Also returns the concatenated string.
Args:
dataset_a: First dataset.
dataset_b: Second dataset.
Returns:
out_dataset: The concatenated dataset.
Output: The concatenated string.
"""
with open(dataset_a.path) as f:
content_a = f.read()
with open(dataset_b.path) as f:
content_b = f.read()
concatenated_string = content_a + content_b
with open(out_dataset.path, 'w') as f:
f.write(concatenated_string)
return concatenated_string
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=dataset_joiner,
package_path=__file__.replace('.py', '.yaml'))

View File

@ -0,0 +1,127 @@
# PIPELINE DEFINITION
# Name: dataset-joiner
# Description: Concatenate dataset_a and dataset_b.
# Also returns the concatenated string.
# Inputs:
# dataset_a: system.Dataset
# dataset_b: system.Dataset
# Outputs:
# Output: str
# out_dataset: system.Dataset
components:
comp-dataset-joiner:
executorLabel: exec-dataset-joiner
inputDefinitions:
artifacts:
dataset_a:
artifactType:
schemaTitle: system.Dataset
schemaVersion: 0.0.1
description: First dataset.
dataset_b:
artifactType:
schemaTitle: system.Dataset
schemaVersion: 0.0.1
description: Second dataset.
outputDefinitions:
artifacts:
out_dataset:
artifactType:
schemaTitle: system.Dataset
schemaVersion: 0.0.1
description: The concatenated dataset.
parameters:
Output:
description: The concatenated string.
parameterType: STRING
deploymentSpec:
executors:
exec-dataset-joiner:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- dataset_joiner
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-beta.14'\
\ && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef dataset_joiner(\n dataset_a: Input[Dataset],\n dataset_b:\
\ Input[Dataset],\n out_dataset: Output[Dataset],\n) -> str:\n \"\"\
\"Concatenate dataset_a and dataset_b.\n\n Also returns the concatenated\
\ string.\n\n Args:\n dataset_a: First dataset.\n dataset_b:\
\ Second dataset.\n\n Returns:\n out_dataset: The concatenated\
\ dataset.\n Output: The concatenated string.\n \"\"\"\n with\
\ open(dataset_a.path) as f:\n content_a = f.read()\n\n with open(dataset_b.path)\
\ as f:\n content_b = f.read()\n\n concatenated_string = content_a\
\ + content_b\n with open(out_dataset.path, 'w') as f:\n f.write(concatenated_string)\n\
\n return concatenated_string\n\n"
image: python:3.7
pipelineInfo:
name: dataset-joiner
root:
dag:
outputs:
artifacts:
out_dataset:
artifactSelectors:
- outputArtifactKey: out_dataset
producerSubtask: dataset-joiner
parameters:
Output:
valueFromParameter:
outputParameterKey: Output
producerSubtask: dataset-joiner
tasks:
dataset-joiner:
cachingOptions:
enableCache: true
componentRef:
name: comp-dataset-joiner
inputs:
artifacts:
dataset_a:
componentInputArtifact: dataset_a
dataset_b:
componentInputArtifact: dataset_b
taskInfo:
name: dataset-joiner
inputDefinitions:
artifacts:
dataset_a:
artifactType:
schemaTitle: system.Dataset
schemaVersion: 0.0.1
description: First dataset.
dataset_b:
artifactType:
schemaTitle: system.Dataset
schemaVersion: 0.0.1
description: Second dataset.
outputDefinitions:
artifacts:
out_dataset:
artifactType:
schemaTitle: system.Dataset
schemaVersion: 0.0.1
description: The concatenated dataset.
parameters:
Output:
description: The concatenated string.
parameterType: STRING
schemaVersion: 2.1.0
sdkVersion: kfp-2.0.0-beta.14

View File

@ -0,0 +1,96 @@
# Copyright 2023 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from kfp import compiler
from kfp import dsl
from kfp.dsl import Dataset
from kfp.dsl import Input
from kfp.dsl import Output
@dsl.component
def str_to_dataset(string: str, dataset: Output[Dataset]):
"""Convert string to dataset.
Args:
string: The string.
Returns:
dataset: The dataset.
"""
with open(dataset.path, 'w') as f:
f.write(string)
@dsl.component
def dataset_joiner(
dataset_a: Input[Dataset],
dataset_b: Input[Dataset],
out_dataset: Output[Dataset],
) -> str:
"""Concatenate dataset_a and dataset_b.
Also returns the concatenated string.
Args:
dataset_a: First dataset.
dataset_b: Second dataset.
Returns:
out_dataset: The concatenated dataset.
Output: The concatenated string.
"""
with open(dataset_a.path) as f:
content_a = f.read()
with open(dataset_b.path) as f:
content_b = f.read()
concatenated_string = content_a + content_b
with open(out_dataset.path, 'w') as f:
f.write(concatenated_string)
return concatenated_string
@dsl.pipeline(
display_name='Concatenation pipeline',
description='A pipeline that joins string to in_dataset.',
)
def dataset_concatenator(
string: str,
in_dataset: Input[Dataset],
) -> Dataset:
"""Pipeline to convert string to a Dataset, then concatenate with
in_dataset.
Args:
string: String to concatenate to in_artifact.
in_dataset: Dataset to which to concatenate string.
Returns:
Output: The final concatenated dataset.
"""
first_dataset_task = str_to_dataset(string=string)
t = dataset_joiner(
dataset_a=first_dataset_task.outputs['dataset'],
dataset_b=in_dataset,
)
return t.outputs['out_dataset']
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=dataset_concatenator,
package_path=__file__.replace('.py', '.yaml'))

View File

@ -0,0 +1,175 @@
# PIPELINE DEFINITION
# Name: dataset-concatenator
# Description: A pipeline that joins string to in_dataset.
# Inputs:
# in_dataset: system.Dataset
# string: str
# Outputs:
# Output: system.Dataset
components:
comp-dataset-joiner:
executorLabel: exec-dataset-joiner
inputDefinitions:
artifacts:
dataset_a:
artifactType:
schemaTitle: system.Dataset
schemaVersion: 0.0.1
description: First dataset.
dataset_b:
artifactType:
schemaTitle: system.Dataset
schemaVersion: 0.0.1
description: Second dataset.
outputDefinitions:
artifacts:
out_dataset:
artifactType:
schemaTitle: system.Dataset
schemaVersion: 0.0.1
description: The concatenated dataset.
parameters:
Output:
description: The concatenated string.
parameterType: STRING
comp-str-to-dataset:
executorLabel: exec-str-to-dataset
inputDefinitions:
parameters:
string:
description: The string.
parameterType: STRING
outputDefinitions:
artifacts:
dataset:
artifactType:
schemaTitle: system.Dataset
schemaVersion: 0.0.1
description: The dataset.
deploymentSpec:
executors:
exec-dataset-joiner:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- dataset_joiner
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-beta.14'\
\ && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef dataset_joiner(\n dataset_a: Input[Dataset],\n dataset_b:\
\ Input[Dataset],\n out_dataset: Output[Dataset],\n) -> str:\n \"\"\
\"Concatenate dataset_a and dataset_b.\n\n Also returns the concatenated\
\ string.\n\n Args:\n dataset_a: First dataset.\n dataset_b:\
\ Second dataset.\n\n Returns:\n out_dataset: The concatenated\
\ dataset.\n Output: The concatenated string.\n \"\"\"\n with\
\ open(dataset_a.path) as f:\n content_a = f.read()\n\n with open(dataset_b.path)\
\ as f:\n content_b = f.read()\n\n concatenated_string = content_a\
\ + content_b\n with open(out_dataset.path, 'w') as f:\n f.write(concatenated_string)\n\
\n return concatenated_string\n\n"
image: python:3.7
exec-str-to-dataset:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- str_to_dataset
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-beta.14'\
\ && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef str_to_dataset(string: str, dataset: Output[Dataset]):\n \"\
\"\"Convert string to dataset.\n\n Args:\n string: The string.\n\
\n Returns:\n dataset: The dataset.\n \"\"\"\n with open(dataset.path,\
\ 'w') as f:\n f.write(string)\n\n"
image: python:3.7
pipelineInfo:
description: A pipeline that joins string to in_dataset.
displayName: Concatenation pipeline
name: dataset-concatenator
root:
dag:
outputs:
artifacts:
Output:
artifactSelectors:
- outputArtifactKey: out_dataset
producerSubtask: dataset-joiner
tasks:
dataset-joiner:
cachingOptions:
enableCache: true
componentRef:
name: comp-dataset-joiner
dependentTasks:
- str-to-dataset
inputs:
artifacts:
dataset_a:
taskOutputArtifact:
outputArtifactKey: dataset
producerTask: str-to-dataset
dataset_b:
componentInputArtifact: in_dataset
taskInfo:
name: dataset-joiner
str-to-dataset:
cachingOptions:
enableCache: true
componentRef:
name: comp-str-to-dataset
inputs:
parameters:
string:
componentInputParameter: string
taskInfo:
name: str-to-dataset
inputDefinitions:
artifacts:
in_dataset:
artifactType:
schemaTitle: system.Dataset
schemaVersion: 0.0.1
description: Dataset to which to concatenate string.
parameters:
string:
description: String to concatenate to in_artifact.
parameterType: STRING
outputDefinitions:
artifacts:
Output:
artifactType:
schemaTitle: system.Dataset
schemaVersion: 0.0.1
description: The final concatenated dataset.
schemaVersion: 2.1.0
sdkVersion: kfp-2.0.0-beta.14

View File

@ -165,6 +165,9 @@ pipelines:
- module: pipeline_with_task_using_ignore_upstream_failure
name: my_pipeline
execute: false
- module: pipeline_with_metadata_fields
name: dataset_concatenator
execute: false
components:
test_data_dir: sdk/python/test_data/components
read: true
@ -225,6 +228,9 @@ components:
- module: container_with_placeholder_in_fstring
name: container_with_placeholder_in_fstring
execute: false
- module: component_with_metadata_fields
name: dataset_joiner
execute: false
v1_components:
test_data_dir: sdk/python/test_data/v1_component_yaml
read: true