feat(sdk): support local Container Component execution #localexecution (#10333)

* support Container Components

* address review feedback
This commit is contained in:
Connor McCarthy 2023-12-20 13:34:20 -05:00 committed by GitHub
parent f52ba56784
commit 846f88770c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 318 additions and 70 deletions

View File

@ -14,7 +14,6 @@
import os
from typing import Any, Dict, List
from kfp.dsl import component_factory
from kfp.local import config
from kfp.local import status
from kfp.local import task_handler_interface
@ -30,8 +29,6 @@ class DockerTaskHandler(task_handler_interface.ITaskHandler):
pipeline_root: str,
runner: config.DockerRunner,
) -> None:
# TODO: remove when full placeholder support is added
self.validate_not_container_component(full_command)
self.image = image
self.full_command = full_command
self.pipeline_root = pipeline_root
@ -50,9 +47,11 @@ class DockerTaskHandler(task_handler_interface.ITaskHandler):
def run(self) -> status.Status:
"""Runs the Docker container and returns the status."""
# nest docker import in case not available in user env so that
# this module is runnable, even if not using DockerRunner
import docker
client = docker.from_env()
try:
import docker
client = docker.from_env()
volumes = self.get_volumes_to_mount()
return_code = run_docker_container(
client=client,
@ -64,23 +63,11 @@ class DockerTaskHandler(task_handler_interface.ITaskHandler):
client.close()
return status.Status.SUCCESS if return_code == 0 else status.Status.FAILURE
def validate_not_container_component(
self,
full_command: List[str],
) -> None:
if not any(component_factory.EXECUTOR_MODULE in part
for part in full_command):
raise RuntimeError(
f'The {config.DockerRunner.__name__} only supports running Lightweight Python Components. You are attempting to run a Container Component.'
)
def pull_image(client: 'docker.DockerClient', image: str) -> None:
if ':' in image:
repository, tag = image.split(':')
else:
repository, tag = image, 'latest'
client.images.pull(repository=repository, tag=tag)
def add_latest_tag_if_not_present(image: str) -> str:
if ':' not in image:
image = f'{image}:latest'
return image
def run_docker_container(
@ -89,14 +76,16 @@ def run_docker_container(
command: List[str],
volumes: Dict[str, Any],
) -> int:
image = add_latest_tag_if_not_present(image=image)
image_exists = any(
image in existing_image.tags for existing_image in client.images.list())
if image_exists:
print(f'Found image {image!r}')
print(f'Found image {image!r}\n')
else:
print(f'Pulling image {image!r}')
pull_image(client, image)
print('Image pull complete')
repository, tag = image.split(':')
client.images.pull(repository=repository, tag=tag)
print('Image pull complete\n')
container = client.containers.run(
image=image,
command=command,

View File

@ -32,6 +32,16 @@ class DockerMockTestCase(unittest.TestCase):
patcher = mock.patch('docker.from_env')
self.mocked_docker_client = patcher.start().return_value
mock_container = mock.Mock()
self.mocked_docker_client.containers.run.return_value = mock_container
# mock successful run
mock_container.logs.return_value = [
'fake'.encode('utf-8'),
'container'.encode('utf-8'),
'logs'.encode('utf-8'),
]
mock_container.wait.return_value = {'StatusCode': 0}
def teardown(self):
super().tearDown()
self.docker_mock.reset_mock()
@ -48,7 +58,7 @@ class TestRunDockerContainer(DockerMockTestCase):
)
self.mocked_docker_client.containers.run.assert_called_once_with(
image='alpine',
image='alpine:latest',
command=['echo', 'foo'],
detach=True,
stdout=True,
@ -68,7 +78,7 @@ class TestRunDockerContainer(DockerMockTestCase):
}},
)
self.mocked_docker_client.containers.run.assert_called_once_with(
image='alpine',
image='alpine:latest',
command=['cat', '/localdir/docker_task_handler_test.py'],
detach=True,
stdout=True,
@ -84,9 +94,7 @@ class TestDockerTaskHandler(DockerMockTestCase):
def test_get_volumes_to_mount(self):
handler = docker_task_handler.DockerTaskHandler(
image='alpine',
# TODO: update to not use executor_main once container components
# supported
full_command=['kfp.dsl.executor_main', 'something else'],
full_command=['echo', 'foo'],
pipeline_root=os.path.abspath('my_root'),
runner=local.DockerRunner(),
)
@ -102,17 +110,15 @@ class TestDockerTaskHandler(DockerMockTestCase):
def test_run(self):
handler = docker_task_handler.DockerTaskHandler(
image='alpine',
# TODO: update to not use executor_main once container components
# supported
full_command=['kfp.dsl.executor_main', 'something else'],
full_command=['echo', 'foo'],
pipeline_root=os.path.abspath('my_root'),
runner=local.DockerRunner(),
)
handler.run()
self.mocked_docker_client.containers.run.assert_called_once_with(
image='alpine',
command=['kfp.dsl.executor_main', 'something else'],
image='alpine:latest',
command=['echo', 'foo'],
detach=True,
stdout=True,
stderr=True,
@ -131,32 +137,37 @@ class TestDockerTaskHandler(DockerMockTestCase):
):
docker_task_handler.DockerTaskHandler(
image='alpine',
# TODO: update to not use executor_main once container components
# supported
full_command=['kfp.dsl.executor_main', 'something else'],
full_command=['echo', 'foo'],
pipeline_root='my_relpath',
runner=local.DockerRunner(),
).run()
class TestPullImage(DockerMockTestCase):
class TestAddLatestTagIfNotPresent(unittest.TestCase):
def test_with_tag(self):
docker_task_handler.pull_image(
client=docker.from_env(), image='foo:123')
self.mocked_docker_client.images.pull.assert_called_once_with(
repository='foo', tag='123')
def test_no_tag(self):
actual = docker_task_handler.add_latest_tag_if_not_present(
image='alpine')
expected = 'alpine:latest'
self.assertEqual(actual, expected)
def test_with_no_tag(self):
docker_task_handler.pull_image(client=docker.from_env(), image='foo')
self.mocked_docker_client.images.pull.assert_called_once_with(
repository='foo', tag='latest')
def test_latest_tag(self):
actual = docker_task_handler.add_latest_tag_if_not_present(
image='alpine:latest')
expected = 'alpine:latest'
self.assertEqual(actual, expected)
def test_no_tag(self):
actual = docker_task_handler.add_latest_tag_if_not_present(
image='alpine:123')
expected = 'alpine:123'
self.assertEqual(actual, expected)
class TestE2E(DockerMockTestCase,
testing_utilities.LocalRunnerEnvironmentTestCase):
def test(self):
def test_python(self):
local.init(runner=local.DockerRunner())
@dsl.component
@ -166,8 +177,8 @@ class TestE2E(DockerMockTestCase,
try:
artifact_maker(x='foo')
except Exception:
# cannot get outputs if they aren't created due to mock
# cannot get outputs if they aren't created due to mock
except FileNotFoundError:
pass
run_mock = self.mocked_docker_client.containers.run
@ -188,6 +199,65 @@ class TestE2E(DockerMockTestCase,
self.assertEqual(kwargs['volumes'][root_vol_key]['bind'], root_vol_key)
self.assertEqual(kwargs['volumes'][root_vol_key]['mode'], 'rw')
def test_empty_container_component(self):
local.init(runner=local.DockerRunner())
@dsl.container_component
def comp():
return dsl.ContainerSpec(image='alpine')
try:
comp()
# cannot get outputs if they aren't created due to mock
except FileNotFoundError:
pass
run_mock = self.mocked_docker_client.containers.run
run_mock.assert_called_once()
kwargs = run_mock.call_args[1]
self.assertEqual(
kwargs['image'],
'alpine:latest',
)
self.assertEqual(kwargs['command'], [])
def test_container_component(self):
local.init(runner=local.DockerRunner())
@dsl.container_component
def artifact_maker(x: str,):
return dsl.ContainerSpec(
image='alpine',
command=['sh', '-c', f'echo prefix-{x}'],
)
try:
artifact_maker(x='foo')
# cannot get outputs if they aren't created due to mock
except FileNotFoundError:
pass
run_mock = self.mocked_docker_client.containers.run
run_mock.assert_called_once()
kwargs = run_mock.call_args[1]
self.assertEqual(
kwargs['image'],
'alpine:latest',
)
self.assertEqual(kwargs['command'], [
'sh',
'-c',
'echo prefix-foo',
])
self.assertTrue(kwargs['detach'])
self.assertTrue(kwargs['stdout'])
self.assertTrue(kwargs['stderr'])
root_vol_key = [
key for key in kwargs['volumes'].keys() if 'local_outputs' in key
][0]
self.assertEqual(kwargs['volumes'][root_vol_key]['bind'], root_vol_key)
self.assertEqual(kwargs['volumes'][root_vol_key]['mode'], 'rw')
if __name__ == '__main__':
unittest.main()

View File

@ -28,6 +28,10 @@ def load_executor_output(
executor_output_path: str) -> pipeline_spec_pb2.ExecutorOutput:
"""Loads the ExecutorOutput message from a path."""
executor_output = pipeline_spec_pb2.ExecutorOutput()
if not os.path.isfile(executor_output_path):
return executor_output
with open(executor_output_path) as f:
json_format.Parse(f.read(), executor_output)
return executor_output

View File

@ -153,19 +153,23 @@ class TestLoadExecutorOutput(unittest.TestCase):
path = os.path.join(tempdir, 'executor_output.json')
testing_utilities.write_proto_to_json_file(executor_output, path)
result = executor_output_utils.load_executor_output(path)
self.assertIsInstance(result, pipeline_spec_pb2.ExecutorOutput)
actual = executor_output_utils.load_executor_output(path)
expected = pipeline_spec_pb2.ExecutorOutput()
expected.parameter_values['foo'].CopyFrom(
struct_pb2.Value(string_value='foo_value'))
self.assertEqual(
result.parameter_values['foo'],
struct_pb2.Value(string_value='foo_value'),
actual.SerializeToString(deterministic=True),
expected.SerializeToString(deterministic=True),
)
def test_not_exists(self):
non_existent_path = 'non_existent_path.json'
with self.assertRaisesRegex(FileNotFoundError,
r'No such file or directory:'):
executor_output_utils.load_executor_output(non_existent_path)
actual = executor_output_utils.load_executor_output(non_existent_path)
expected = pipeline_spec_pb2.ExecutorOutput()
self.assertEqual(
actual.SerializeToString(deterministic=True),
expected.SerializeToString(deterministic=True),
)
class TestGetOutputsFromExecutorOutput(unittest.TestCase):

View File

@ -14,7 +14,8 @@
"""Utilities for working with placeholders."""
import json
import random
from typing import Any, Dict, List
import re
from typing import Any, Dict, List, Optional
from kfp import dsl
@ -47,6 +48,77 @@ def replace_placeholders(
]
def get_value_using_path(
dictionary: Dict[str, Any],
path: List[str],
) -> Optional[Any]:
list_or_dict = dictionary
if not path:
raise ValueError('path cannot be empty.')
try:
for p in path:
list_or_dict = list_or_dict[p]
return list_or_dict
except KeyError:
return None
def convert_placeholder_parts_to_path(parts: List[str]) -> List[str]:
# if inputs, parameters --> parameterValues
if parts[0] == 'inputs' and parts[1] == 'parameters':
parts[1] = 'parameterValues'
# if outputs, parameter output_file --> outputFile
if parts[0] == 'outputs' and parts[1] == 'parameters' and parts[
3] == 'output_file':
parts[3] = 'outputFile'
# if artifacts...
if parts[1] == 'artifacts':
# ...need to get nested artifact object...
parts.insert(3, 'artifacts')
# ...and first entry in list with index 0
parts.insert(4, 0)
# for local, path is the uri
if parts[5] == 'path':
parts[5] = 'uri'
return parts
def resolve_io_placeholders(
executor_input: Dict[str, Any],
command: str,
) -> str:
placeholders = re.findall(r'\{\{\$\.(.*?)\}\}', command)
# e.g., placeholder = "inputs.parameters[''text'']"
for placeholder in placeholders:
if 'json_escape' in placeholder:
raise ValueError('JSON escape placeholders are not supported.')
# e.g., parts = ['inputs', 'parameters', '', 'text', '', '']
parts = re.split(r'\.|\[|\]|\'\'|\'', placeholder)
# e.g., nonempty_parts = ['inputs', 'parameters', 'text']
nonempty_parts = [part for part in parts if part]
# e.g., path = ['inputs', 'parameterValues', 'text']
path = convert_placeholder_parts_to_path(nonempty_parts)
# e.g., path = ['inputs', 'parameterValues', 'text']
value = get_value_using_path(executor_input, path)
if value is not None:
if not isinstance(value, str):
value = json.dumps(value)
command = command.replace('{{$.' + placeholder + '}}', value)
return command
# TODO: support concat and if-present placeholders
def replace_placeholder_for_element(
element: str,
executor_input_dict: Dict[str, Any],
@ -75,7 +147,10 @@ def replace_placeholder_for_element(
dsl.PIPELINE_ROOT_PLACEHOLDER:
pipeline_root,
}
# match on literal for constant placeholders
for placeholder, value in PLACEHOLDERS.items():
element = element.replace(placeholder, value)
return element
# match differently for non-constant placeholders (i.e., have key(s))
return resolve_io_placeholders(executor_input_dict, element)

View File

@ -26,7 +26,10 @@ json_format.ParseDict(
{
'inputs': {
'parameterValues': {
'boolean': False
'boolean': False,
'dictionary': {
'foo': 'bar'
},
}
},
'outputs': {
@ -47,7 +50,11 @@ json_format.ParseDict(
},
'uri':
'/foo/bar/my-pipeline-2023-10-10-13-32-59-420710/comp/out_a',
'metadata': {}
'metadata': {
'foo': {
'bar': 'baz'
}
}
}]
}
},
@ -87,6 +94,9 @@ class TestReplacePlaceholders(unittest.TestCase):
class TestReplacePlaceholderForElement(parameterized.TestCase):
# TODO: consider supporting JSON escape
# TODO: update when input artifact constants supported
# TODO: update when output lists of artifacts are supported
@parameterized.parameters([
(
'{{$}}',
@ -121,7 +131,7 @@ class TestReplacePlaceholderForElement(parameterized.TestCase):
'/foo/bar/my-pipeline-2023-10-10-13-32-59-420710',
),
])
def test(self, element: str, expected: str):
def test_constant_placeholders(self, element: str, expected: str):
actual = placeholder_utils.replace_placeholder_for_element(
element=element,
executor_input_dict=EXECUTOR_INPUT_DICT,
@ -160,6 +170,103 @@ class TestReplacePlaceholderForElement(parameterized.TestCase):
)
self.assertEqual(actual, expected)
@parameterized.parameters([
(
"{{$.inputs.parameters[''boolean'']}}",
json.dumps(False),
),
(
"{{$.outputs.artifacts[''out_a''].metadata}}",
json.dumps({'foo': {
'bar': 'baz'
}}),
),
(
"{{$.outputs.parameters[''Output''].output_file}}",
'/foo/bar/my-pipeline-2023-10-10-13-32-59-420710/comp/Output',
),
(
"{{$.outputs.artifacts[''out_a''].uri}}",
'/foo/bar/my-pipeline-2023-10-10-13-32-59-420710/comp/out_a',
),
(
"{{$.outputs.artifacts[''out_a''].path}}",
'/foo/bar/my-pipeline-2023-10-10-13-32-59-420710/comp/out_a',
),
(
"{{$.outputs.artifacts[''out_a''].metadata[''foo'']}}",
json.dumps({'bar': 'baz'}),
),
])
def test_io_placeholders(self, element: str, expected: str):
actual = placeholder_utils.replace_placeholder_for_element(
element=element,
executor_input_dict=EXECUTOR_INPUT_DICT,
pipeline_resource_name='my-pipeline-2023-10-10-13-32-59-420710',
task_resource_name='comp',
pipeline_root='/foo/bar/my-pipeline-2023-10-10-13-32-59-420710',
pipeline_job_id='123456789',
pipeline_task_id='987654321',
)
self.assertEqual(actual, expected)
@parameterized.parameters([
(
"my-prefix-{{$.inputs.parameters[''boolean'']}}-suffix",
'my-prefix-false-suffix',
),
(
"prefix{{$.outputs.parameters[''Output''].output_file}}/suffix",
'prefix/foo/bar/my-pipeline-2023-10-10-13-32-59-420710/comp/Output/suffix',
),
(
"prefix{{$.inputs.parameters[''dictionary'']}}suffix",
'prefix{"foo": "bar"}suffix',
),
])
def test_io_placeholder_with_string_concat(self, element: str,
expected: str):
actual = placeholder_utils.replace_placeholder_for_element(
element=element,
executor_input_dict=EXECUTOR_INPUT_DICT,
pipeline_resource_name='my-pipeline-2023-10-10-13-32-59-420710',
task_resource_name='comp',
pipeline_root='/foo/bar/my-pipeline-2023-10-10-13-32-59-420710',
pipeline_job_id='123456789',
pipeline_task_id='987654321',
)
self.assertEqual(actual, expected)
class TestGetValueUsingPath(unittest.TestCase):
def test_valid_path(self):
actual = placeholder_utils.get_value_using_path(
{'a': {
'b': {
'c': 10
}
}},
['a', 'b', 'c'],
)
expected = 10
self.assertEqual(actual, expected)
def test_invalid_path(self):
actual = placeholder_utils.get_value_using_path(
{'a': {
'b': {
'c': 10
}
}},
['a', 'x'],
)
self.assertIsNone(actual)
def test_empty_path(self):
with self.assertRaisesRegex(ValueError, r'path cannot be empty\.'):
placeholder_utils.get_value_using_path({'a': 20}, [])
if __name__ == '__main__':
unittest.main()

View File

@ -64,8 +64,7 @@ class SubprocessTaskHandler(task_handler_interface.ITaskHandler):
def validate_image(self, image: str) -> None:
if 'python' not in image:
warnings.warn(
f"You may be attemping to run a task that uses custom or non-Python base image '{image}' in a Python environment. This may result in incorrect dependencies and/or incorrect behavior.",
# TODO: suggest using container runner
f"You may be attemping to run a task that uses custom or non-Python base image '{image}' in a Python environment. This may result in incorrect dependencies and/or incorrect behavior. Consider using the 'DockerRunner' to run this task in a container.",
RuntimeWarning,
)

View File

@ -86,12 +86,12 @@ def _run_single_component_implementation(
component_spec.executor_label]
container = executor_spec['container']
full_command = list(container['command']) + list(container['args'])
# image + full_command are "inputs" to local execution
image = container['image']
# TODO: handler container component placeholders when
# ContainerRunner is implemented
command = list(container['command']) if 'command' in container else []
args = list(container['args']) if 'args' in container else []
full_command = command + args
executor_input_dict = executor_input_utils.executor_input_to_dict(
executor_input=executor_input,
component_spec=component_spec,