414 lines
15 KiB
Python
414 lines
15 KiB
Python
# Copyright 2020 The Kubeflow Authors
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import tempfile
|
|
import textwrap
|
|
import unittest
|
|
import warnings
|
|
import kfp
|
|
from pathlib import Path
|
|
from kfp.components import load_component_from_text, create_component_from_func
|
|
from kfp.dsl.types import InconsistentTypeException
|
|
from kfp.dsl import PipelineParam
|
|
|
|
|
|
class TestComponentBridge(unittest.TestCase):
|
|
# Alternatively, we could use kfp.dsl.Pipleine().__enter__ and __exit__
|
|
def setUp(self):
|
|
self.old_container_task_constructor = kfp.components._components._container_task_constructor
|
|
kfp.components._components._container_task_constructor = kfp.dsl._component_bridge._create_container_op_from_component_and_arguments
|
|
|
|
def tearDown(self):
|
|
kfp.components._components._container_task_constructor = self.old_container_task_constructor
|
|
|
|
def test_conversion_to_container_op(self):
|
|
component_text = textwrap.dedent('''\
|
|
name: Custom component
|
|
implementation:
|
|
container:
|
|
image: busybox
|
|
''')
|
|
task_factory1 = load_component_from_text(component_text)
|
|
task1 = task_factory1()
|
|
|
|
self.assertEqual(task1.human_name, 'Custom component')
|
|
|
|
def test_passing_env_to_container_op(self):
|
|
component_text = textwrap.dedent('''\
|
|
implementation:
|
|
container:
|
|
image: busybox
|
|
env:
|
|
key1: value 1
|
|
key2: value 2
|
|
''')
|
|
task_factory1 = load_component_from_text(component_text)
|
|
|
|
task1 = task_factory1()
|
|
actual_env = {
|
|
env_var.name: env_var.value for env_var in task1.container.env
|
|
}
|
|
expected_env = {'key1': 'value 1', 'key2': 'value 2'}
|
|
self.assertDictEqual(expected_env, actual_env)
|
|
|
|
def test_input_path_placeholder_with_constant_argument(self):
|
|
component_text = textwrap.dedent('''\
|
|
inputs:
|
|
- {name: input 1}
|
|
implementation:
|
|
container:
|
|
image: busybox
|
|
command:
|
|
- --input-data
|
|
- {inputPath: input 1}
|
|
''')
|
|
task_factory1 = load_component_from_text(component_text)
|
|
task1 = task_factory1('Text')
|
|
|
|
self.assertEqual(
|
|
task1.command,
|
|
['--input-data', task1.input_artifact_paths['input 1']])
|
|
self.assertEqual(task1.artifact_arguments, {'input 1': 'Text'})
|
|
|
|
def test_passing_component_metadata_to_container_op(self):
|
|
component_text = textwrap.dedent('''\
|
|
metadata:
|
|
annotations:
|
|
key1: value1
|
|
labels:
|
|
key1: value1
|
|
implementation:
|
|
container:
|
|
image: busybox
|
|
''')
|
|
task_factory1 = load_component_from_text(text=component_text)
|
|
|
|
task1 = task_factory1()
|
|
self.assertEqual(task1.pod_annotations['key1'], 'value1')
|
|
self.assertEqual(task1.pod_labels['key1'], 'value1')
|
|
|
|
def test_volatile_components(self):
|
|
component_text = textwrap.dedent('''\
|
|
metadata:
|
|
annotations:
|
|
volatile_component: "true"
|
|
implementation:
|
|
container:
|
|
image: busybox
|
|
''')
|
|
task_factory1 = load_component_from_text(text=component_text)
|
|
|
|
task1 = task_factory1()
|
|
self.assertEqual(
|
|
task1.execution_options.caching_strategy.max_cache_staleness, 'P0D')
|
|
|
|
def test_type_compatibility_check_not_failing_when_disabled(self):
|
|
component_a = textwrap.dedent('''\
|
|
outputs:
|
|
- {name: out1, type: type_A}
|
|
implementation:
|
|
container:
|
|
image: busybox
|
|
command: [bash, -c, 'mkdir -p "$(dirname "$0")"; date > "$0"', {outputPath: out1}]
|
|
''')
|
|
component_b = textwrap.dedent('''\
|
|
inputs:
|
|
- {name: in1, type: type_Z}
|
|
implementation:
|
|
container:
|
|
image: busybox
|
|
command: [echo, {inputValue: in1}]
|
|
''')
|
|
kfp.TYPE_CHECK = False
|
|
task_factory_a = load_component_from_text(component_a)
|
|
task_factory_b = load_component_from_text(component_b)
|
|
a_task = task_factory_a()
|
|
b_task = task_factory_b(in1=a_task.outputs['out1'])
|
|
kfp.TYPE_CHECK = True
|
|
|
|
def test_type_compatibility_check_not_failing_when_type_is_ignored(self):
|
|
component_a = textwrap.dedent('''\
|
|
outputs:
|
|
- {name: out1, type: type_A}
|
|
implementation:
|
|
container:
|
|
image: busybox
|
|
command: [bash, -c, 'mkdir -p "$(dirname "$0")"; date > "$0"', {outputPath: out1}]
|
|
''')
|
|
component_b = textwrap.dedent('''\
|
|
inputs:
|
|
- {name: in1, type: type_Z}
|
|
implementation:
|
|
container:
|
|
image: busybox
|
|
command: [echo, {inputValue: in1}]
|
|
''')
|
|
task_factory_a = load_component_from_text(component_a)
|
|
task_factory_b = load_component_from_text(component_b)
|
|
a_task = task_factory_a()
|
|
b_task = task_factory_b(in1=a_task.outputs['out1'].ignore_type())
|
|
|
|
def test_end_to_end_python_component_pipeline_compilation(self):
|
|
import kfp.components as comp
|
|
|
|
#Defining the Python function
|
|
def add(a: float, b: float) -> float:
|
|
"""Returns sum of two arguments."""
|
|
return a + b
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir_name:
|
|
add_component_file = str(
|
|
Path(temp_dir_name).joinpath('add.component.yaml'))
|
|
|
|
#Converting the function to a component. Instantiate it to create a pipeline task (ContaineOp instance)
|
|
add_op = comp.func_to_container_op(
|
|
add,
|
|
base_image='python:3.5',
|
|
output_component_file=add_component_file)
|
|
|
|
#Checking that the component artifact is usable:
|
|
add_op2 = comp.load_component_from_file(add_component_file)
|
|
|
|
#Building the pipeline
|
|
@kfp.dsl.pipeline(
|
|
name='Calculation pipeline',
|
|
description='A pipeline that performs arithmetic calculations.')
|
|
def calc_pipeline(
|
|
a1,
|
|
a2='7',
|
|
a3='17',
|
|
):
|
|
task_1 = add_op(a1, a2)
|
|
task_2 = add_op2(a1, a2)
|
|
task_3 = add_op(task_1.output, task_2.output)
|
|
task_4 = add_op2(task_3.output, a3)
|
|
|
|
#Compiling the pipleine:
|
|
pipeline_filename = str(
|
|
Path(temp_dir_name).joinpath(calc_pipeline.__name__ +
|
|
'.pipeline.tar.gz'))
|
|
kfp.compiler.Compiler().compile(calc_pipeline, pipeline_filename)
|
|
|
|
def test_handling_list_arguments_containing_pipelineparam(self):
|
|
"""Checks that lists containing PipelineParam can be properly
|
|
serialized."""
|
|
|
|
def consume_list(list_param: list) -> int:
|
|
pass
|
|
|
|
import kfp
|
|
task_factory = create_component_from_func(consume_list)
|
|
task = task_factory([1, 2, 3, kfp.dsl.PipelineParam('aaa'), 4, 5, 6])
|
|
|
|
full_command_line = task.command + task.arguments
|
|
for arg in full_command_line:
|
|
self.assertNotIn('PipelineParam', arg)
|
|
|
|
def test_converted_outputs(self):
|
|
component_text = textwrap.dedent('''\
|
|
outputs:
|
|
- name: Output 1
|
|
implementation:
|
|
container:
|
|
image: busybox
|
|
command:
|
|
- producer
|
|
- {outputPath: Output 1} # Outputs must be used in the implementation
|
|
''')
|
|
task_factory1 = load_component_from_text(component_text)
|
|
task1 = task_factory1()
|
|
|
|
self.assertSetEqual(set(task1.outputs.keys()), {'Output 1', 'output_1'})
|
|
self.assertIsNotNone(task1.output)
|
|
|
|
def test_reusable_component_warnings(self):
|
|
op1 = load_component_from_text('''\
|
|
implementation:
|
|
container:
|
|
image: busybox
|
|
''')
|
|
with warnings.catch_warnings(record=True) as warning_messages:
|
|
op1()
|
|
deprecation_messages = list(
|
|
str(message)
|
|
for message in warning_messages
|
|
if message.category == DeprecationWarning)
|
|
self.assertListEqual(deprecation_messages, [])
|
|
|
|
with self.assertWarnsRegex(FutureWarning, expected_regex='reusable'):
|
|
kfp.dsl.ContainerOp(name='name', image='image')
|
|
|
|
with self.assertWarnsRegex(FutureWarning, expected_regex='reusable'):
|
|
kfp.dsl.ContainerOp(
|
|
name='name',
|
|
image='image',
|
|
arguments=[PipelineParam('param1'),
|
|
PipelineParam('param2')])
|
|
|
|
def test_prevent_passing_container_op_as_argument(self):
|
|
component_text = textwrap.dedent('''\
|
|
inputs:
|
|
- {name: input 1}
|
|
- {name: input 2}
|
|
implementation:
|
|
container:
|
|
image: busybox
|
|
command:
|
|
- prog
|
|
- {inputValue: input 1}
|
|
- {inputPath: input 2}
|
|
''')
|
|
component = load_component_from_text(component_text)
|
|
# Passing normal values to component
|
|
task1 = component(input_1="value 1", input_2="value 2")
|
|
# Passing unserializable values to component
|
|
with self.assertRaises(TypeError):
|
|
component(input_1=task1, input_2="value 2")
|
|
with self.assertRaises(TypeError):
|
|
component(input_1="value 1", input_2=task1)
|
|
|
|
def test_pythonic_container_output_handled_by_graph(self):
|
|
component_a = textwrap.dedent('''\
|
|
inputs: []
|
|
outputs:
|
|
- {name: out1, type: str}
|
|
implementation:
|
|
graph:
|
|
tasks:
|
|
some-container:
|
|
arguments: {}
|
|
componentRef:
|
|
spec:
|
|
outputs:
|
|
- {name: out1, type: str}
|
|
implementation:
|
|
container:
|
|
image: busybox
|
|
command: [bash, -c, 'mkdir -p "$(dirname "$0")"; date > "$0"', {outputPath: out1}]
|
|
outputValues:
|
|
out1:
|
|
taskOutput:
|
|
taskId: some-container
|
|
outputName: out1
|
|
''')
|
|
component_b = textwrap.dedent('''\
|
|
inputs:
|
|
- {name: in1, type: str}
|
|
implementation:
|
|
container:
|
|
image: busybox
|
|
command: [echo, {inputValue: in1}]
|
|
''')
|
|
task_factory_a = load_component_from_text(component_a)
|
|
task_factory_b = load_component_from_text(component_b)
|
|
a_task = task_factory_a()
|
|
b_task = task_factory_b(in1=a_task.outputs['out1'])
|
|
|
|
def test_nonpythonic_container_output_handled_by_graph(self):
|
|
component_a = textwrap.dedent('''\
|
|
inputs: []
|
|
outputs:
|
|
- {name: out1, type: str}
|
|
implementation:
|
|
graph:
|
|
tasks:
|
|
some-container:
|
|
arguments: {}
|
|
componentRef:
|
|
spec:
|
|
outputs:
|
|
- {name: out-1, type: str}
|
|
implementation:
|
|
container:
|
|
image: busybox
|
|
command: [bash, -c, 'mkdir -p "$(dirname "$0")"; date > "$0"', {outputPath: out-1}]
|
|
outputValues:
|
|
out1:
|
|
taskOutput:
|
|
taskId: some-container
|
|
outputName: out-1
|
|
''')
|
|
component_b = textwrap.dedent('''\
|
|
inputs:
|
|
- {name: in1, type: str}
|
|
implementation:
|
|
container:
|
|
image: busybox
|
|
command: [echo, {inputValue: in1}]
|
|
''')
|
|
task_factory_a = load_component_from_text(component_a)
|
|
task_factory_b = load_component_from_text(component_b)
|
|
a_task = task_factory_a()
|
|
b_task = task_factory_b(in1=a_task.outputs['out1'])
|
|
|
|
# v2 tests
|
|
|
|
def test_input_output_uri_resolving(self):
|
|
component_text = textwrap.dedent('''\
|
|
inputs:
|
|
- {name: In1}
|
|
outputs:
|
|
- {name: Out1}
|
|
implementation:
|
|
container:
|
|
image: busybox
|
|
command:
|
|
- program
|
|
- --in1-uri
|
|
- {inputUri: In1}
|
|
- --out1-uri
|
|
- {outputUri: Out1}
|
|
''')
|
|
op = load_component_from_text(text=component_text)
|
|
task = op(in1='foo')
|
|
|
|
self.assertEqual([
|
|
'program',
|
|
'--in1-uri',
|
|
'{{$.inputs.artifacts[\'In1\'].uri}}',
|
|
'--out1-uri',
|
|
'{{$.outputs.artifacts[\'Out1\'].uri}}',
|
|
], task.command)
|
|
|
|
def test_convert_executor_input_and_output_metadata_placeholder(self):
|
|
test_component = textwrap.dedent("""\
|
|
inputs:
|
|
- {name: in1}
|
|
outputs:
|
|
- {name: out1}
|
|
implementation:
|
|
container:
|
|
image: busybox
|
|
command: [echo, {executorInput}, {outputMetadata}]
|
|
""")
|
|
task_factory = load_component_from_text(test_component)
|
|
task = task_factory(in1='foo')
|
|
self.assertListEqual(
|
|
['echo', '{{$}}', '/tmp/outputs/executor_output.json'],
|
|
task.command)
|
|
|
|
def test_fail_executor_input_with_key(self):
|
|
test_component = textwrap.dedent("""\
|
|
inputs:
|
|
- {name: in1}
|
|
outputs:
|
|
- {name: out1}
|
|
implementation:
|
|
container:
|
|
image: busybox
|
|
command: [echo, {executorInput: a_bad_key}]
|
|
""")
|
|
with self.assertRaises(TypeError):
|
|
_ = load_component_from_text(test_component)
|