fix(sdk): fixes type issues for ParallelFor. Fixes #9366 (#10436)

* fix type issues

* fix type issue

* fix format

* fix failed test

* fix format

* delete comments

* resolve comments

* resolve comments

* resolve format

* resolve import

* move unnecessary file

* resolve compiler_test failures

* resolve comments

* remove unnecessary imports

* fix format sort

* fix nits

* add new compiled yaml file

* solve merge conflicts

* solve conflicts

* format

* sort

* resolve comments

* resolve comments
This commit is contained in:
rickyxie0929 2024-02-08 10:34:37 -08:00 committed by GitHub
parent 87db18e3a1
commit fe04a5a842
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 874 additions and 13 deletions

View File

@ -11,7 +11,7 @@
## Deprecations
## Bug fixes and other changes
* Fix the compilation error when trying to iterate over a list of dictionaries with ParallelFor [\#10436](https://github.com/kubeflow/pipelines/pull/10436)
## Documentation updates
# 2.6.0

View File

@ -749,6 +749,41 @@ implementation:
pipeline_spec['root']['dag']['tasks']['for-loop-2']
['iteratorPolicy']['parallelismLimit'], 2)
def test_compile_parallel_for_with_incompatible_input_type(self):
@dsl.component
def producer_op(item: str) -> str:
return item
@dsl.component
def list_dict_maker() -> List[Dict[str, int]]:
return [{'a': 1, 'b': 2}, {'a': 2, 'b': 3}, {'a': 3, 'b': 4}]
with self.assertRaisesRegex(
type_utils.InconsistentTypeException,
"Incompatible argument passed to the input 'item' of component 'producer-op': Argument type 'NUMBER_INTEGER' is incompatible with the input type 'STRING'"
):
@dsl.pipeline
def my_pipeline(text: bool):
with dsl.ParallelFor(items=list_dict_maker().output) as item:
producer_task = producer_op(item=item.a)
def test_compile_parallel_for_with_relaxed_type_checking(self):
@dsl.component
def producer_op(item: str) -> str:
return item
@dsl.component
def list_dict_maker() -> List[Dict]:
return [{'a': 1, 'b': 2}, {'a': 2, 'b': 3}, {'a': 3, 'b': 4}]
@dsl.pipeline
def my_pipeline(text: bool):
with dsl.ParallelFor(items=list_dict_maker().output) as item:
producer_task = producer_op(item=item.a)
def test_compile_parallel_for_with_invalid_parallelism(self):
@dsl.component

View File

@ -67,6 +67,25 @@ def _get_subvar_type(type_name: str) -> Optional[str]:
return match['value_type'].lstrip().rstrip() if match else None
def _get_first_element_type(item_list: ItemList) -> str:
"""Returns the type of the first element of ItemList.
Args:
item_list: List of items to loop over. If a list of dicts then, all dicts must have the same keys.
Returns:
A string representing the type of the first element (e.g., "int", "Dict[str, int]").
"""
first_element = item_list[0]
if isinstance(first_element, dict):
key_type = type(list(
first_element.keys())[0]).__name__ # Get type of first key
value_type = type(list(
first_element.values())[0]).__name__ # Get type of first value
return f'Dict[{key_type}, {value_type}]'
else:
return type(first_element).__name__
def _make_name(code: str) -> str:
"""Makes a name for a loop argument from a unique code."""
return f'{LOOP_ITEM_PARAM_NAME_BASE}-{code}'
@ -162,7 +181,13 @@ class LoopParameterArgument(pipeline_channel.PipelineParameterChannel):
channel: pipeline_channel.PipelineParameterChannel,
) -> 'LoopParameterArgument':
"""Creates a LoopParameterArgument object from a
PipelineParameterChannel object."""
PipelineParameterChannel object.
Provide a flexible default channel_type ('String') if extraction
from PipelineParameterChannel is unsuccessful. This maintains
compilation progress in cases of unknown or missing type
information.
"""
return LoopParameterArgument(
items=channel,
name_override=channel.name + '-' + LOOP_ITEM_NAME_BASE,
@ -183,7 +208,7 @@ class LoopParameterArgument(pipeline_channel.PipelineParameterChannel):
return LoopParameterArgument(
items=raw_items,
name_code=name_code,
channel_type=type(raw_items[0]).__name__,
channel_type=_get_first_element_type(raw_items),
)
@ -302,7 +327,7 @@ class LoopArgumentVariable(pipeline_channel.PipelineParameterChannel):
self.subvar_name = subvar_name
self.loop_argument = loop_argument
# Handle potential channel_type extraction errors from LoopArgument by defaulting to 'String'. This maintains compilation progress.
super().__init__(
name=self._get_name_override(
loop_arg_name=loop_argument.name,

View File

@ -77,6 +77,35 @@ class ForLoopTest(parameterized.TestCase):
def test_get_subvar_type(self, dict_type, value_type):
self.assertEqual(for_loop._get_subvar_type(dict_type), value_type)
@parameterized.parameters(
{
'item_list': [
{
'A_a': 1
},
{
'A_a': 2
},
],
'value_type': 'Dict[str, int]',
},
{
'item_list': [1, 2, 3],
'value_type': 'int',
},
{
'item_list': ['a', 'b', 'c'],
'value_type': 'str',
},
{
'item_list': [2.3, 4.5, 3.5],
'value_type': 'float',
},
)
def test_get_first_element_type(self, item_list, value_type):
self.assertEqual(
for_loop._get_first_element_type(item_list), value_type)
@parameterized.parameters(
{
'channel':
@ -97,6 +126,16 @@ class ForLoopTest(parameterized.TestCase):
'expected_serialization_value':
'{{channel:task=task1;name=output1-loop-item;type=Dict[str, str];}}',
},
{
'channel':
pipeline_channel.PipelineParameterChannel(
name='output2',
channel_type='List[Dict]',
task_name='task1',
),
'expected_serialization_value':
'{{channel:task=task1;name=output2-loop-item;type=Dict;}}',
},
)
def test_loop_parameter_argument_from_pipeline_channel(
self, channel, expected_serialization_value):
@ -175,7 +214,7 @@ class ForLoopTest(parameterized.TestCase):
'name_code':
'2',
'expected_serialization_value':
'{{channel:task=;name=loop-item-param-2;type=dict;}}',
'{{channel:task=;name=loop-item-param-2;type=Dict[str, int];}}',
},
)
def test_loop_argument_from_raw_items(self, raw_items, name_code,

View File

@ -280,6 +280,15 @@ def verify_type_compatibility(
expected_type = expected_spec.type
given_type = _get_type_string_from_component_argument(given_value)
# avoid circular imports
from kfp.dsl import for_loop
# Workaround for potential type-checking issues during ParallelFor compilation: When LoopArgument or LoopArgumentVariable are involved and the expected type is 'String', we temporarily relax type enforcement to avoid blocking compilation. This is necessary due to potential information loss during the compilation step.
if isinstance(given_value,
(for_loop.LoopParameterArgument,
for_loop.LoopArgumentVariable)) and given_type == 'String':
return True
given_is_param = is_parameter_type(str(given_type))
if given_is_param:
given_type = get_parameter_type_name(given_type)

View File

@ -23,6 +23,7 @@ from kfp import components
from kfp import dsl
from kfp.dsl import base_component
from kfp.dsl import Dataset
from kfp.dsl import for_loop
from kfp.dsl import Input
from kfp.dsl import Output
from kfp.dsl import pipeline_channel
@ -713,6 +714,31 @@ class TestTypeChecking(parameterized.TestCase):
'is_compatible':
False,
},
{
'argument_value':
for_loop.LoopArgumentVariable(
loop_argument=for_loop.LoopParameterArgument
.from_pipeline_channel(
pipeline_channel.create_pipeline_channel(
'Output-loop-item', 'String',
'list-dict-without-type-maker-5')),
subvar_name='a'),
'parameter_input_spec':
structures.InputSpec('Integer'),
'is_compatible':
True,
},
{
'argument_value':
for_loop.LoopParameterArgument.from_pipeline_channel(
pipeline_channel.create_pipeline_channel(
'Output-loop-item', 'String',
'list-dict-without-type-maker-5')),
'parameter_input_spec':
structures.InputSpec('Integer'),
'is_compatible':
True,
},
)
def test_verify_type_compatibility(
self,

View File

@ -12,9 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List
import os
import tempfile
from typing import Dict, List
from kfp import compiler
from kfp import components
from kfp import dsl
from kfp.dsl import component
@ -24,6 +27,58 @@ def print_text(msg: str):
print(msg)
@component
def print_int(x: int):
print(x)
@component
def list_dict_maker_0() -> List[Dict[str, int]]:
"""Enforces strict type checking - returns a list of dictionaries
where keys are strings and values are integers. For testing type
handling during compilation."""
return [{'a': 1, 'b': 2}, {'a': 2, 'b': 3}, {'a': 3, 'b': 4}]
@component
def list_dict_maker_1() -> List[Dict]:
"""Utilizes generic dictionary typing (no enforcement of specific key or
value types).
Tests flexibility in type handling.
"""
return [{'a': 1, 'b': 2}, {'a': 2, 'b': 3}, {'a': 3, 'b': 4}]
@component
def list_dict_maker_2() -> List[dict]:
"""Returns a list of dictionaries without type enforcement.
Tests flexibility in type handling.
"""
return [{'a': 1, 'b': 2}, {'a': 2, 'b': 3}, {'a': 3, 'b': 4}]
@component
def list_dict_maker_3() -> List:
"""Returns a basic list (no typing or structure guarantees).
Tests the limits of compiler type handling.
"""
return [{'a': 1, 'b': 2}, {'a': 2, 'b': 3}, {'a': 3, 'b': 4}]
with tempfile.TemporaryDirectory() as tmpdir:
pipeline_package_path = os.path.join(tmpdir, 'upstream_component.yaml')
compiler.Compiler().compile(
pipeline_func=list_dict_maker_1,
package_path=pipeline_package_path,
)
loaded_dict_maker = components.load_component_from_file(
pipeline_package_path)
@dsl.pipeline(name='pipeline-with-loops')
def my_pipeline(loop_parameter: List[str]):
@ -52,6 +107,33 @@ def my_pipeline(loop_parameter: List[str]):
print_text(msg=nested_item.A_a)
print_text(msg=nested_item.B_b)
# Loop argument that is a static dictionary known at compile time.
dict_loop_argument = [{'a': 1, 'b': 2}, {'a': 2, 'b': 3}, {'a': 3, 'b': 4}]
with dsl.ParallelFor(items=dict_loop_argument, parallelism=1) as item:
print_int(x=item.a)
# Loop argument that coming from the upstream component.
t_0 = list_dict_maker_0()
with dsl.ParallelFor(items=t_0.output) as item:
print_int(x=item.a)
t_1 = list_dict_maker_1()
with dsl.ParallelFor(items=t_1.output) as item:
print_int(x=item.a)
t_2 = list_dict_maker_2()
with dsl.ParallelFor(items=t_2.output) as item:
print_int(x=item.a)
t_3 = list_dict_maker_3()
with dsl.ParallelFor(items=t_3.output) as item:
print_int(x=item.a)
# Loop argument that coming from the upstream component compiled file.
t_4 = loaded_dict_maker()
with dsl.ParallelFor(items=t_4.output) as item:
print_int(x=item.a)
if __name__ == '__main__':
compiler.Compiler().compile(

View File

@ -36,6 +36,90 @@ components:
parameterType: LIST
pipelinechannel--loop_parameter-loop-item:
parameterType: STRING
comp-for-loop-10:
dag:
tasks:
print-int-3:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-int-3
inputs:
parameters:
x:
componentInputParameter: pipelinechannel--list-dict-maker-1-Output-loop-item
parameterExpressionSelector: parseJson(string_value)["a"]
taskInfo:
name: print-int-3
inputDefinitions:
parameters:
pipelinechannel--list-dict-maker-1-Output:
parameterType: LIST
pipelinechannel--list-dict-maker-1-Output-loop-item:
parameterType: STRUCT
comp-for-loop-11:
dag:
tasks:
print-int-4:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-int-4
inputs:
parameters:
x:
componentInputParameter: pipelinechannel--list-dict-maker-2-Output-loop-item
parameterExpressionSelector: parseJson(string_value)["a"]
taskInfo:
name: print-int-4
inputDefinitions:
parameters:
pipelinechannel--list-dict-maker-2-Output:
parameterType: LIST
pipelinechannel--list-dict-maker-2-Output-loop-item:
parameterType: STRUCT
comp-for-loop-12:
dag:
tasks:
print-int-5:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-int-5
inputs:
parameters:
x:
componentInputParameter: pipelinechannel--list-dict-maker-3-Output-loop-item
parameterExpressionSelector: parseJson(string_value)["a"]
taskInfo:
name: print-int-5
inputDefinitions:
parameters:
pipelinechannel--list-dict-maker-3-Output:
parameterType: LIST
pipelinechannel--list-dict-maker-3-Output-loop-item:
parameterType: STRING
comp-for-loop-13:
dag:
tasks:
print-int-6:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-int-6
inputs:
parameters:
x:
componentInputParameter: pipelinechannel--list-dict-maker-1-2-Output-loop-item
parameterExpressionSelector: parseJson(string_value)["a"]
taskInfo:
name: print-int-6
inputDefinitions:
parameters:
pipelinechannel--list-dict-maker-1-2-Output:
parameterType: LIST
pipelinechannel--list-dict-maker-1-2-Output-loop-item:
parameterType: STRING
comp-for-loop-2:
dag:
tasks:
@ -129,6 +213,112 @@ components:
parameters:
pipelinechannel--loop-item-param-5:
parameterType: STRUCT
comp-for-loop-8:
dag:
tasks:
print-int:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-int
inputs:
parameters:
x:
componentInputParameter: pipelinechannel--loop-item-param-7
parameterExpressionSelector: parseJson(string_value)["a"]
taskInfo:
name: print-int
inputDefinitions:
parameters:
pipelinechannel--loop-item-param-7:
parameterType: STRUCT
comp-for-loop-9:
dag:
tasks:
print-int-2:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-int-2
inputs:
parameters:
x:
componentInputParameter: pipelinechannel--list-dict-maker-0-Output-loop-item
parameterExpressionSelector: parseJson(string_value)["a"]
taskInfo:
name: print-int-2
inputDefinitions:
parameters:
pipelinechannel--list-dict-maker-0-Output:
parameterType: LIST
pipelinechannel--list-dict-maker-0-Output-loop-item:
parameterType: STRUCT
comp-list-dict-maker-0:
executorLabel: exec-list-dict-maker-0
outputDefinitions:
parameters:
Output:
parameterType: LIST
comp-list-dict-maker-1:
executorLabel: exec-list-dict-maker-1
outputDefinitions:
parameters:
Output:
parameterType: LIST
comp-list-dict-maker-1-2:
executorLabel: exec-list-dict-maker-1-2
outputDefinitions:
parameters:
Output:
parameterType: LIST
comp-list-dict-maker-2:
executorLabel: exec-list-dict-maker-2
outputDefinitions:
parameters:
Output:
parameterType: LIST
comp-list-dict-maker-3:
executorLabel: exec-list-dict-maker-3
outputDefinitions:
parameters:
Output:
parameterType: LIST
comp-print-int:
executorLabel: exec-print-int
inputDefinitions:
parameters:
x:
parameterType: NUMBER_INTEGER
comp-print-int-2:
executorLabel: exec-print-int-2
inputDefinitions:
parameters:
x:
parameterType: NUMBER_INTEGER
comp-print-int-3:
executorLabel: exec-print-int-3
inputDefinitions:
parameters:
x:
parameterType: NUMBER_INTEGER
comp-print-int-4:
executorLabel: exec-print-int-4
inputDefinitions:
parameters:
x:
parameterType: NUMBER_INTEGER
comp-print-int-5:
executorLabel: exec-print-int-5
inputDefinitions:
parameters:
x:
parameterType: NUMBER_INTEGER
comp-print-int-6:
executorLabel: exec-print-int-6
inputDefinitions:
parameters:
x:
parameterType: NUMBER_INTEGER
comp-print-text:
executorLabel: exec-print-text
inputDefinitions:
@ -167,6 +357,330 @@ components:
parameterType: STRING
deploymentSpec:
executors:
exec-list-dict-maker-0:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- list_dict_maker_0
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.6.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef list_dict_maker_0() -> List[Dict[str, int]]:\n \"\"\"Enforces\
\ strict type checking - returns a list of dictionaries \n where keys\
\ are strings and values are integers. For testing type \n handling during\
\ compilation.\"\"\"\n return [{'a': 1, 'b': 2}, {'a': 2, 'b': 3}, {'a':\
\ 3, 'b': 4}]\n\n"
image: python:3.7
exec-list-dict-maker-1:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- list_dict_maker_1
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.6.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef list_dict_maker_1() -> List[Dict]:\n \"\"\"Utilizes generic\
\ dictionary typing (no enforcement of specific key or\n value types).\n\
\n Tests flexibility in type handling.\n \"\"\"\n return [{'a':\
\ 1, 'b': 2}, {'a': 2, 'b': 3}, {'a': 3, 'b': 4}]\n\n"
image: python:3.7
exec-list-dict-maker-1-2:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- list_dict_maker_1
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.6.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef list_dict_maker_1() -> List[Dict]:\n \"\"\"Utilizes generic\
\ dictionary typing (no enforcement of specific key or\n value types).\n\
\n Tests flexibility in type handling.\n \"\"\"\n return [{'a':\
\ 1, 'b': 2}, {'a': 2, 'b': 3}, {'a': 3, 'b': 4}]\n\n"
image: python:3.7
exec-list-dict-maker-2:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- list_dict_maker_2
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.6.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef list_dict_maker_2() -> List[dict]:\n \"\"\"Returns a list\
\ of dictionaries without type enforcement.\n\n Tests flexibility in\
\ type handling.\n \"\"\"\n return [{'a': 1, 'b': 2}, {'a': 2, 'b':\
\ 3}, {'a': 3, 'b': 4}]\n\n"
image: python:3.7
exec-list-dict-maker-3:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- list_dict_maker_3
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.6.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef list_dict_maker_3() -> List:\n \"\"\"Returns a basic list\
\ (no typing or structure guarantees).\n\n Tests the limits of compiler\
\ type handling.\n \"\"\"\n return [{'a': 1, 'b': 2}, {'a': 2, 'b':\
\ 3}, {'a': 3, 'b': 4}]\n\n"
image: python:3.7
exec-print-int:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- print_int
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.6.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef print_int(x: int):\n print(x)\n\n"
image: python:3.7
exec-print-int-2:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- print_int
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.6.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef print_int(x: int):\n print(x)\n\n"
image: python:3.7
exec-print-int-3:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- print_int
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.6.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef print_int(x: int):\n print(x)\n\n"
image: python:3.7
exec-print-int-4:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- print_int
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.6.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef print_int(x: int):\n print(x)\n\n"
image: python:3.7
exec-print-int-5:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- print_int
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.6.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef print_int(x: int):\n print(x)\n\n"
image: python:3.7
exec-print-int-6:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- print_int
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.6.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef print_int(x: int):\n print(x)\n\n"
image: python:3.7
exec-print-text:
container:
args:
@ -179,7 +693,7 @@ deploymentSpec:
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.6.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
@ -207,7 +721,7 @@ deploymentSpec:
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.6.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
@ -235,7 +749,7 @@ deploymentSpec:
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.6.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
@ -263,7 +777,7 @@ deploymentSpec:
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.6.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
@ -291,7 +805,7 @@ deploymentSpec:
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.6.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
@ -319,7 +833,7 @@ deploymentSpec:
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.6.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
@ -355,6 +869,74 @@ root:
inputParameter: pipelinechannel--loop_parameter
taskInfo:
name: for-loop-1
for-loop-10:
componentRef:
name: comp-for-loop-10
dependentTasks:
- list-dict-maker-1
inputs:
parameters:
pipelinechannel--list-dict-maker-1-Output:
taskOutputParameter:
outputParameterKey: Output
producerTask: list-dict-maker-1
parameterIterator:
itemInput: pipelinechannel--list-dict-maker-1-Output-loop-item
items:
inputParameter: pipelinechannel--list-dict-maker-1-Output
taskInfo:
name: for-loop-10
for-loop-11:
componentRef:
name: comp-for-loop-11
dependentTasks:
- list-dict-maker-2
inputs:
parameters:
pipelinechannel--list-dict-maker-2-Output:
taskOutputParameter:
outputParameterKey: Output
producerTask: list-dict-maker-2
parameterIterator:
itemInput: pipelinechannel--list-dict-maker-2-Output-loop-item
items:
inputParameter: pipelinechannel--list-dict-maker-2-Output
taskInfo:
name: for-loop-11
for-loop-12:
componentRef:
name: comp-for-loop-12
dependentTasks:
- list-dict-maker-3
inputs:
parameters:
pipelinechannel--list-dict-maker-3-Output:
taskOutputParameter:
outputParameterKey: Output
producerTask: list-dict-maker-3
parameterIterator:
itemInput: pipelinechannel--list-dict-maker-3-Output-loop-item
items:
inputParameter: pipelinechannel--list-dict-maker-3-Output
taskInfo:
name: for-loop-12
for-loop-13:
componentRef:
name: comp-for-loop-13
dependentTasks:
- list-dict-maker-1-2
inputs:
parameters:
pipelinechannel--list-dict-maker-1-2-Output:
taskOutputParameter:
outputParameterKey: Output
producerTask: list-dict-maker-1-2
parameterIterator:
itemInput: pipelinechannel--list-dict-maker-1-2-Output-loop-item
items:
inputParameter: pipelinechannel--list-dict-maker-1-2-Output
taskInfo:
name: for-loop-13
for-loop-4:
componentRef:
name: comp-for-loop-4
@ -364,9 +946,72 @@ root:
raw: '[{"A_a": "1", "B_b": "2"}, {"A_a": "10", "B_b": "20"}]'
taskInfo:
name: for-loop-4
for-loop-8:
componentRef:
name: comp-for-loop-8
iteratorPolicy:
parallelismLimit: 1
parameterIterator:
itemInput: pipelinechannel--loop-item-param-7
items:
raw: '[{"a": 1, "b": 2}, {"a": 2, "b": 3}, {"a": 3, "b": 4}]'
taskInfo:
name: for-loop-8
for-loop-9:
componentRef:
name: comp-for-loop-9
dependentTasks:
- list-dict-maker-0
inputs:
parameters:
pipelinechannel--list-dict-maker-0-Output:
taskOutputParameter:
outputParameterKey: Output
producerTask: list-dict-maker-0
parameterIterator:
itemInput: pipelinechannel--list-dict-maker-0-Output-loop-item
items:
inputParameter: pipelinechannel--list-dict-maker-0-Output
taskInfo:
name: for-loop-9
list-dict-maker-0:
cachingOptions:
enableCache: true
componentRef:
name: comp-list-dict-maker-0
taskInfo:
name: list-dict-maker-0
list-dict-maker-1:
cachingOptions:
enableCache: true
componentRef:
name: comp-list-dict-maker-1
taskInfo:
name: list-dict-maker-1
list-dict-maker-1-2:
cachingOptions:
enableCache: true
componentRef:
name: comp-list-dict-maker-1-2
taskInfo:
name: list-dict-maker-1-2
list-dict-maker-2:
cachingOptions:
enableCache: true
componentRef:
name: comp-list-dict-maker-2
taskInfo:
name: list-dict-maker-2
list-dict-maker-3:
cachingOptions:
enableCache: true
componentRef:
name: comp-list-dict-maker-3
taskInfo:
name: list-dict-maker-3
inputDefinitions:
parameters:
loop_parameter:
parameterType: LIST
schemaVersion: 2.1.0
sdkVersion: kfp-2.1.3
sdkVersion: kfp-2.6.0