pipelines/sdk/python/kfp/v2/components/executor_main.py

85 lines
2.8 KiB
Python

# Copyright 2021 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import json
import importlib
import os
import sys
from kfp.v2.components import executor as component_executor
def _load_module(module_name: str, module_directory: str):
"""Dynamically imports the Python module with the given name and package
path.
E.g., Assuming there is a file called `my_module.py` under
`/some/directory/my_module`, we can use::
_load_module('my_module', '/some/directory')
to effectively `import mymodule`.
Args:
module_name: The name of the module.
package_path: The package under which the specified module resides.
"""
module_spec = importlib.util.spec_from_file_location(
name=module_name,
location=os.path.join(module_directory, f'{module_name}.py'))
module = importlib.util.module_from_spec(module_spec)
sys.modules[module_spec.name] = module
module_spec.loader.exec_module(module)
return module
def executor_main():
parser = argparse.ArgumentParser(description='KFP Component Executor.')
parser.add_argument(
'--component_module_path',
type=str,
help='Path to a module containing the KFP component.')
parser.add_argument(
'--function_to_execute',
type=str,
help='The name of the component function in '
'--component_module_path file that is to be executed.')
parser.add_argument(
'--executor_input',
type=str,
help='JSON-serialized ExecutorInput from the orchestrator. '
'This should contain inputs and placeholders for outputs.')
args, _ = parser.parse_known_args()
module_directory = os.path.dirname(args.component_module_path)
module_name = os.path.basename(args.component_module_path)[:-len('.py')]
print('Loading KFP component module {} from dir {}'.format(
module_name, module_directory))
module = _load_module(
module_name=module_name, module_directory=module_directory)
executor_input = json.loads(args.executor_input)
function_to_execute = getattr(module, args.function_to_execute)
executor = component_executor.Executor(
executor_input=executor_input, function_to_execute=function_to_execute)
executor.execute()
if __name__ == '__main__':
executor_main()