diff --git a/sdk/python/kfp/local/docker_task_handler_test.py b/sdk/python/kfp/local/docker_task_handler_test.py index 06e3e8a18c..cd083c92fa 100755 --- a/sdk/python/kfp/local/docker_task_handler_test.py +++ b/sdk/python/kfp/local/docker_task_handler_test.py @@ -11,7 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import contextlib import os +import pathlib from typing import Optional import unittest from unittest import mock @@ -20,6 +22,7 @@ import docker from kfp import dsl from kfp import local from kfp.dsl import Artifact +from kfp.dsl import Dataset from kfp.dsl import Output from kfp.local import docker_task_handler from kfp.local import testing_utilities @@ -43,9 +46,10 @@ class DockerMockTestCase(unittest.TestCase): ] mock_container.wait.return_value = {'StatusCode': 0} - def teardown(self): + @classmethod + def teardown(cls): super().tearDown() - self.docker_mock.reset_mock() + cls.docker_mock.reset_mock() class TestRunDockerContainer(DockerMockTestCase): @@ -165,8 +169,10 @@ class TestAddLatestTagIfNotPresent(unittest.TestCase): self.assertEqual(actual, expected) -class TestE2E(DockerMockTestCase, - testing_utilities.LocalRunnerEnvironmentTestCase): +class TestE2E( + DockerMockTestCase, + testing_utilities.MockedDatetimeTestCase, +): def setUp(self): super().setUp() @@ -179,11 +185,8 @@ class TestE2E(DockerMockTestCase, with open(a.path, 'w') as f: f.write(x) - try: - artifact_maker(x='foo') - # cannot get outputs if they aren't created due to mock - except FileNotFoundError: - pass + # NOTE: outputs cannot be collected since run is mocked + artifact_maker(x='foo') run_mock = self.mocked_docker_client.containers.run run_mock.assert_called_once() @@ -209,11 +212,8 @@ class TestE2E(DockerMockTestCase, def comp(): return dsl.ContainerSpec(image='alpine') - try: - comp() - # cannot get outputs if they aren't created due to mock - except FileNotFoundError: - pass + # NOTE: outputs cannot be collected since run is mocked + comp() run_mock = self.mocked_docker_client.containers.run run_mock.assert_called_once() @@ -233,11 +233,8 @@ class TestE2E(DockerMockTestCase, command=['sh', '-c', f'echo prefix-{x}'], ) - try: - artifact_maker(x='foo') - # cannot get outputs if they aren't created due to mock - except FileNotFoundError: - pass + # NOTE: outputs cannot be collected since run is mocked + artifact_maker(x='foo') run_mock = self.mocked_docker_client.containers.run run_mock.assert_called_once() @@ -526,6 +523,102 @@ class TestE2E(DockerMockTestCase, ) self.assertEqual(kwargs['command'], ['echo', 'another thing']) + def test_all_params(self): + local.init(runner=local.DockerRunner()) + + @dsl.container_component + def comp( + str_in: str, + int_in: int, + float_in: float, + bool_in: bool, + dict_in: dict, + list_in: list, + str_out: dsl.OutputPath(str), + int_out: dsl.OutputPath(int), + float_out: dsl.OutputPath(float), + bool_out: dsl.OutputPath(bool), + dict_out: dsl.OutputPath(dict), + list_out: dsl.OutputPath(list), + ): + return dsl.ContainerSpec( + image='alpine', + command=[ + 'sh', + '-c', + ], + args=[ + 'echo "{{$}}" && ' + f'mkdir -p $(dirname {str_out}) && echo -n {str_in} > {str_out} && ' + f'mkdir -p $(dirname {int_out}) && echo -n {int_in} > {int_out} && ' + f'mkdir -p $(dirname {float_out}) && echo -n {float_in} > {float_out} && ' + f'mkdir -p $(dirname {bool_out}) && echo -n {bool_in} > {bool_out} && ' + f"mkdir -p $(dirname {dict_out}) && echo -n '{dict_in}' > {dict_out} && " + f"mkdir -p $(dirname {list_out}) && echo -n '{list_in}' > {list_out}" + ], + ) + + run_mock = self.mocked_docker_client.containers.run + + # NOTE: outputs cannot be collected since run is mocked + with contextlib.suppress(KeyError): + comp( + str_in='foo', + int_in=100, + float_in=2.718, + bool_in=False, + dict_in={'x': 'y'}, + list_in=['a', 'b', 'c'], + ) + run_mock.assert_called_once() + kwargs = run_mock.call_args[1] + run_mock.reset_mock() + project_root = str(pathlib.Path(__file__).resolve().parents[4]) + self.assertEqual(kwargs['command'], [ + 'sh', + '-c', + 'echo "{"inputs": {"parameterValues": {"str_in": "foo", "int_in": 100, "float_in": 2.718, "bool_in": false, "dict_in": {"x": "y"}, "list_in": ["a", "b", "c"]}}, "outputs": {"parameters": {"str_out": {"outputFile": "REPLACE_PATH/local_outputs/comp-2023-10-10-13-32-59-420710/comp/str_out"}, "int_out": {"outputFile": "REPLACE_PATH/local_outputs/comp-2023-10-10-13-32-59-420710/comp/int_out"}, "float_out": {"outputFile": "REPLACE_PATH/local_outputs/comp-2023-10-10-13-32-59-420710/comp/float_out"}, "bool_out": {"outputFile": "REPLACE_PATH/local_outputs/comp-2023-10-10-13-32-59-420710/comp/bool_out"}, "dict_out": {"outputFile": "REPLACE_PATH/local_outputs/comp-2023-10-10-13-32-59-420710/comp/dict_out"}, "list_out": {"outputFile": "REPLACE_PATH/local_outputs/comp-2023-10-10-13-32-59-420710/comp/list_out"}}, "outputFile": "REPLACE_PATH/local_outputs/comp-2023-10-10-13-32-59-420710/comp/executor_output.json"}}" && mkdir -p $(dirname REPLACE_PATH/local_outputs/comp-2023-10-10-13-32-59-420710/comp/str_out) && echo -n foo > REPLACE_PATH/local_outputs/comp-2023-10-10-13-32-59-420710/comp/str_out && mkdir -p $(dirname REPLACE_PATH/local_outputs/comp-2023-10-10-13-32-59-420710/comp/int_out) && echo -n 100 > REPLACE_PATH/local_outputs/comp-2023-10-10-13-32-59-420710/comp/int_out && mkdir -p $(dirname REPLACE_PATH/local_outputs/comp-2023-10-10-13-32-59-420710/comp/float_out) && echo -n 2.718 > REPLACE_PATH/local_outputs/comp-2023-10-10-13-32-59-420710/comp/float_out && mkdir -p $(dirname REPLACE_PATH/local_outputs/comp-2023-10-10-13-32-59-420710/comp/bool_out) && echo -n false > REPLACE_PATH/local_outputs/comp-2023-10-10-13-32-59-420710/comp/bool_out && mkdir -p $(dirname REPLACE_PATH/local_outputs/comp-2023-10-10-13-32-59-420710/comp/dict_out) && echo -n \'{"x": "y"}\' > REPLACE_PATH/local_outputs/comp-2023-10-10-13-32-59-420710/comp/dict_out && mkdir -p $(dirname REPLACE_PATH/local_outputs/comp-2023-10-10-13-32-59-420710/comp/list_out) && echo -n \'["a", "b", "c"]\' > REPLACE_PATH/local_outputs/comp-2023-10-10-13-32-59-420710/comp/list_out' + .replace('REPLACE_PATH', project_root), + ]) + + def test_output_artifact(self): + local.init(runner=local.DockerRunner()) + + @dsl.container_component + def comp( + text: str, + dataset: Output[Dataset], + ): + return dsl.ContainerSpec( + image='alpine', + command=[ + 'sh', + '-c', + ], + args=[ + 'echo "{{$}}" && ' + f'mkdir -p $(dirname {dataset.path}) && echo -n {text} > {dataset.path}' + ], + ) + + run_mock = self.mocked_docker_client.containers.run + + # NOTE: outputs cannot be collected since run is mocked + with contextlib.suppress(KeyError): + comp(text='foo') + run_mock.assert_called_once() + kwargs = run_mock.call_args[1] + run_mock.reset_mock() + project_root = str(pathlib.Path(__file__).resolve().parents[4]) + self.assertEqual( + kwargs['command'], + [ + 'sh', '-c', + 'echo "{"inputs": {"parameterValues": {"text": "foo"}}, "outputs": {"artifacts": {"dataset": {"artifacts": [{"name": "dataset", "type": {"schemaTitle": "system.Dataset", "schemaVersion": "0.0.1"}, "uri": "REPLACE_PATH/local_outputs/comp-2023-10-10-13-32-59-420710/comp/dataset", "metadata": {}}]}}, "outputFile": "REPLACE_PATH/local_outputs/comp-2023-10-10-13-32-59-420710/comp/executor_output.json"}}" && mkdir -p $(dirname REPLACE_PATH/local_outputs/comp-2023-10-10-13-32-59-420710/comp/dataset) && echo -n foo > REPLACE_PATH/local_outputs/comp-2023-10-10-13-32-59-420710/comp/dataset' + .replace('REPLACE_PATH', project_root) + ], + ) + if __name__ == '__main__': unittest.main() diff --git a/sdk/python/kfp/local/executor_output_utils.py b/sdk/python/kfp/local/executor_output_utils.py index 716eb405e6..c2f209fdf4 100644 --- a/sdk/python/kfp/local/executor_output_utils.py +++ b/sdk/python/kfp/local/executor_output_utils.py @@ -129,7 +129,7 @@ def special_dsl_outputpath_read( return value except Exception as e: raise ValueError( - f'Could not deserialize output {parameter_name!r} from path {output_file}' + f'Could not deserialize output {parameter_name!r} with value {value!r} from path {output_file}' ) from e diff --git a/sdk/python/kfp/local/testing_utilities.py b/sdk/python/kfp/local/testing_utilities.py index 7b4324ba75..2ef013ac50 100755 --- a/sdk/python/kfp/local/testing_utilities.py +++ b/sdk/python/kfp/local/testing_utilities.py @@ -18,7 +18,6 @@ import functools import os import pathlib import tempfile -from typing import Any, Callable, Dict import unittest from unittest import mock @@ -28,7 +27,6 @@ from google.protobuf import message from kfp import components from kfp import dsl from kfp.local import config as local_config -from kfp.local import docker_task_handler _LOCAL_KFP_PACKAGE_PATH = os.path.join( os.path.dirname(__file__), @@ -37,23 +35,6 @@ _LOCAL_KFP_PACKAGE_PATH = os.path.join( ) -def modify_volumes_decorator( - original_method: Callable[..., Any]) -> Callable[..., Any]: - - def wrapper(self, *args, **kwargs) -> Dict[str, Any]: - original_volumes = original_method(self, *args, **kwargs) - LOCAL_KFP_VOLUME = { - _LOCAL_KFP_PACKAGE_PATH: { - 'bind': _LOCAL_KFP_PACKAGE_PATH, - 'mode': 'rw' - } - } - original_volumes.update(LOCAL_KFP_VOLUME) - return original_volumes - - return wrapper - - class LocalRunnerEnvironmentTestCase(parameterized.TestCase): """Test class that uses an isolated filesystem and updates the dsl.component decorator to install from the local KFP source, rather than @@ -68,19 +49,11 @@ class LocalRunnerEnvironmentTestCase(parameterized.TestCase): self.temp_dir = tempfile.TemporaryDirectory() os.chdir(self.temp_dir.name) - # ENTER: mount KFP dir to enable install from source for docker runner - self.original_get_volumes_to_mount = docker_task_handler.DockerTaskHandler.get_volumes_to_mount - docker_task_handler.DockerTaskHandler.get_volumes_to_mount = modify_volumes_decorator( - docker_task_handler.DockerTaskHandler.get_volumes_to_mount) - def tearDown(self): # EXIT: use tempdir for all tests self.temp_dir.cleanup() os.chdir(self.working_dir) - # EXIT: mount KFP dir to enable install from source for docker runner - docker_task_handler.DockerTaskHandler.get_volumes_to_mount = self.original_get_volumes_to_mount - @classmethod def setUpClass(cls): # ENTER: use local KFP package path for subprocess runner