140 lines
5.1 KiB
Python
140 lines
5.1 KiB
Python
# Copyright 2024 The Kubeflow Authors
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""Code for locally executing a compiled pipeline."""
|
|
import logging
|
|
from typing import Any, Dict, List
|
|
|
|
from kfp.local import config
|
|
from kfp.local import dag_orchestrator
|
|
from kfp.local import logging_utils
|
|
from kfp.local import placeholder_utils
|
|
from kfp.local import status
|
|
from kfp.local import utils
|
|
from kfp.pipeline_spec import pipeline_spec_pb2
|
|
|
|
|
|
def run_local_pipeline(
|
|
pipeline_spec: pipeline_spec_pb2.PipelineSpec,
|
|
arguments: Dict[str, Any],
|
|
) -> Dict[str, Any]:
|
|
"""kfp.local's entrypoint for running a local pipeline.
|
|
|
|
Args:
|
|
pipeline_spec: PipelineSpec to run.
|
|
arguments: User-provided arguments.
|
|
|
|
Returns:
|
|
The pipeline outputs.
|
|
"""
|
|
|
|
# validate and access all global state in this function, not downstream
|
|
config.LocalExecutionConfig.validate()
|
|
return _run_local_pipeline_implementation(
|
|
pipeline_spec=pipeline_spec,
|
|
arguments=arguments,
|
|
raise_on_error=config.LocalExecutionConfig.instance.raise_on_error,
|
|
pipeline_root=config.LocalExecutionConfig.instance.pipeline_root,
|
|
runner=config.LocalExecutionConfig.instance.runner,
|
|
)
|
|
|
|
|
|
def _run_local_pipeline_implementation(
|
|
pipeline_spec: pipeline_spec_pb2.PipelineSpec,
|
|
arguments: Dict[str, Any],
|
|
raise_on_error: bool,
|
|
pipeline_root: str,
|
|
runner: config.LocalRunnerType,
|
|
) -> Dict[str, Any]:
|
|
"""Implementation of run local pipeline.
|
|
|
|
Args:
|
|
pipeline_spec: PipelineSpec to run.
|
|
arguments: User-provided arguments.
|
|
raise_on_error: Whether to raise an exception if a task exits with failure.
|
|
pipeline_root: The local pipeline root.
|
|
runner: The user-specified local runner.
|
|
|
|
Returns:
|
|
The pipeline outputs.
|
|
"""
|
|
from kfp.local import executor_input_utils
|
|
|
|
pipeline_name = pipeline_spec.pipeline_info.name
|
|
pipeline_resource_name = executor_input_utils.get_local_pipeline_resource_name(
|
|
pipeline_name)
|
|
pipeline_name_with_color = logging_utils.format_pipeline_name(pipeline_name)
|
|
|
|
with logging_utils.local_logger_context():
|
|
logging.info(f'Running pipeline: {pipeline_name_with_color}')
|
|
logging_utils.print_horizontal_line()
|
|
|
|
executors = {
|
|
name: utils.struct_to_executor_spec(executor) for name, executor in
|
|
pipeline_spec.deployment_spec['executors'].items()
|
|
}
|
|
# convert to dict for consistency with executors
|
|
components = dict(pipeline_spec.components.items())
|
|
fail_stack: List[str] = []
|
|
outputs, dag_status = dag_orchestrator.run_dag(
|
|
pipeline_resource_name=pipeline_resource_name,
|
|
dag_component_spec=pipeline_spec.root,
|
|
executors=executors,
|
|
components=components,
|
|
dag_arguments=arguments,
|
|
pipeline_root=pipeline_root,
|
|
runner=runner,
|
|
unique_pipeline_id=placeholder_utils.make_random_id(),
|
|
fail_stack=fail_stack,
|
|
)
|
|
if dag_status == status.Status.SUCCESS:
|
|
status_with_color = logging_utils.format_status(status.Status.SUCCESS)
|
|
with logging_utils.local_logger_context():
|
|
logging.info(
|
|
f'Pipeline {pipeline_name_with_color} finished with status {status_with_color}'
|
|
)
|
|
return outputs
|
|
elif dag_status == status.Status.FAILURE:
|
|
log_and_maybe_raise_for_failure(
|
|
pipeline_name=pipeline_name,
|
|
fail_stack=fail_stack,
|
|
raise_on_error=raise_on_error,
|
|
)
|
|
return {}
|
|
else:
|
|
raise ValueError(f'Got unknown task status {dag_status.name}')
|
|
|
|
|
|
def log_and_maybe_raise_for_failure(
|
|
pipeline_name: str,
|
|
raise_on_error: bool,
|
|
fail_stack: List[str],
|
|
) -> None:
|
|
"""To be called if an inner pipeline task exits with failure status. Either
|
|
logs error or throws exception, depending on raise_on_error.
|
|
|
|
Args:
|
|
pipeline_name: The name of the root pipeline.
|
|
raise_on_error: Whether to raise on error.
|
|
fail_stack: The stack of task failures, if any, starting with the innermost task that failed to the outermost pipeline. Excludes the root pipeline.
|
|
"""
|
|
status_with_color = logging_utils.format_status(status.Status.FAILURE)
|
|
pipeline_name_with_color = logging_utils.format_pipeline_name(pipeline_name)
|
|
task_chain_with_color = ' inside '.join(
|
|
logging_utils.format_task_name(task_name) for task_name in fail_stack)
|
|
msg = f'Pipeline {pipeline_name_with_color} finished with status {status_with_color}. Inner task failed: {task_chain_with_color}.'
|
|
if raise_on_error:
|
|
raise RuntimeError(msg)
|
|
with logging_utils.local_logger_context():
|
|
logging.error(msg)
|