diff --git a/sdk/python/kfp/compiler_cli_tests/test_data/lightweight_python_functions_v2_pipeline.py b/sdk/python/kfp/compiler_cli_tests/test_data/lightweight_python_functions_v2_pipeline.py index 40d7303710..0953a584bb 100644 --- a/sdk/python/kfp/compiler_cli_tests/test_data/lightweight_python_functions_v2_pipeline.py +++ b/sdk/python/kfp/compiler_cli_tests/test_data/lightweight_python_functions_v2_pipeline.py @@ -16,8 +16,13 @@ from typing import Dict, List from kfp import compiler from kfp import dsl -from kfp.dsl import (Dataset, Input, InputPath, Model, Output, OutputPath, - component) +from kfp.dsl import Dataset +from kfp.dsl import Input +from kfp.dsl import InputPath +from kfp.dsl import Model +from kfp.dsl import Output +from kfp.dsl import OutputPath +from kfp.dsl import component @component diff --git a/sdk/python/kfp/compiler_cli_tests/test_data/pipeline_with_resource_spec.py b/sdk/python/kfp/compiler_cli_tests/test_data/pipeline_with_resource_spec.py index 5473da6fbd..9779c6a90e 100644 --- a/sdk/python/kfp/compiler_cli_tests/test_data/pipeline_with_resource_spec.py +++ b/sdk/python/kfp/compiler_cli_tests/test_data/pipeline_with_resource_spec.py @@ -38,8 +38,8 @@ def my_pipeline(input_location: str = 'gs://test-bucket/pipeline_root', training_op( examples=ingestor.outputs['examples'], optimizer=optimizer, - n_epochs=n_epochs).set_cpu_limit('4').set_memory_limit( - '14Gi').add_node_selector_constraint('tpu-v3').set_gpu_limit('1')) + n_epochs=n_epochs).set_cpu_limit('4').set_memory_limit('14Gi') + .add_node_selector_constraint('tpu-v3').set_gpu_limit('1')) if __name__ == '__main__': diff --git a/sdk/python/kfp/components/component_decorator.py b/sdk/python/kfp/components/component_decorator.py index 5a521e5ac4..bbd26bcfaf 100644 --- a/sdk/python/kfp/components/component_decorator.py +++ b/sdk/python/kfp/components/component_decorator.py @@ -93,7 +93,7 @@ def component(func: Optional[Callable] = None, """ if output_component_file is not None: raise Exception("output_component_file is not supported yet in v2 early" - "releases and will be added back for v2.0.0 ") + "releases and will be added back for v2.0.0 ") if func is None: return functools.partial( diff --git a/sdk/python/kfp/components/pipeline_task.py b/sdk/python/kfp/components/pipeline_task.py index cc8e3dec58..916c0ddb1c 100644 --- a/sdk/python/kfp/components/pipeline_task.py +++ b/sdk/python/kfp/components/pipeline_task.py @@ -106,9 +106,7 @@ class PipelineTask: self.task_spec = structures.TaskSpec( name=self.register_task_handler(), - inputs={ - input_name: value for input_name, value in args.items() - }, + inputs={input_name: value for input_name, value in args.items()}, dependent_tasks=[], component_ref=component_spec.name, enable_caching=True, @@ -338,8 +336,7 @@ class PipelineTask: resolved_container_spec = copy.deepcopy(container_spec) resolved_container_spec.command = expand_argument_list( container_spec.command) - resolved_container_spec.args = expand_argument_list( - container_spec.args) + resolved_container_spec.args = expand_argument_list(container_spec.args) return resolved_container_spec diff --git a/sdk/python/kfp/components/structures.py b/sdk/python/kfp/components/structures.py index 829f8a29e8..d6db20885f 100644 --- a/sdk/python/kfp/components/structures.py +++ b/sdk/python/kfp/components/structures.py @@ -375,7 +375,8 @@ class ComponentSpec(BaseModel): raise ValueError( f'Argument "{arg}" references non-existing input.') for placeholder in itertools.chain(arg.if_structure.then or [], - arg.if_structure.otherwise or []): + arg.if_structure.otherwise or + []): cls._check_valid_placeholder_reference(valid_inputs, valid_outputs, placeholder) @@ -413,15 +414,20 @@ class ComponentSpec(BaseModel): if isinstance(arg, str): return arg if 'inputValue' in arg: - return InputValuePlaceholder(input_name=utils.sanitize_input_name(arg['inputValue'])) + return InputValuePlaceholder( + input_name=utils.sanitize_input_name(arg['inputValue'])) if 'inputPath' in arg: - return InputPathPlaceholder(input_name=utils.sanitize_input_name(arg['inputPath'])) + return InputPathPlaceholder( + input_name=utils.sanitize_input_name(arg['inputPath'])) if 'inputUri' in arg: - return InputUriPlaceholder(input_name=utils.sanitize_input_name(arg['inputUri'])) + return InputUriPlaceholder( + input_name=utils.sanitize_input_name(arg['inputUri'])) if 'outputPath' in arg: - return OutputPathPlaceholder(output_name=utils.sanitize_input_name(arg['outputPath'])) + return OutputPathPlaceholder( + output_name=utils.sanitize_input_name(arg['outputPath'])) if 'outputUri' in arg: - return OutputUriPlaceholder(output_name=utils.sanitize_input_name(arg['outputUri'])) + return OutputUriPlaceholder( + output_name=utils.sanitize_input_name(arg['outputUri'])) if 'if' in arg: if_placeholder_values = arg['if'] if_placeholder_values_then = list(if_placeholder_values['then']) @@ -434,7 +440,8 @@ class ComponentSpec(BaseModel): IfPresentPlaceholderStructure.update_forward_refs() return IfPresentPlaceholder( if_structure=IfPresentPlaceholderStructure( - input_name=utils.sanitize_input_name(if_placeholder_values['cond']['isPresent']), + input_name=utils.sanitize_input_name( + if_placeholder_values['cond']['isPresent']), then=list( _transform_arg(val) for val in if_placeholder_values_then), @@ -497,7 +504,8 @@ class ComponentSpec(BaseModel): for spec in component_dict.get('inputs', []) }, outputs={ - utils.sanitize_input_name(spec['name']): OutputSpec(type=spec.get('type', 'Artifact')) + utils.sanitize_input_name(spec['name']): + OutputSpec(type=spec.get('type', 'Artifact')) for spec in component_dict.get('outputs', []) }) @@ -554,8 +562,8 @@ class ComponentSpec(BaseModel): for cmd in self.implementation.container.command or [] ], args=[ - _transform_arg(arg) for arg in - self.implementation.container.args or [] + _transform_arg(arg) + for arg in self.implementation.container.args or [] ], env={ name: _transform_arg(value) for name, value in diff --git a/sdk/python/kfp/components/types/type_annotations_test.py b/sdk/python/kfp/components/types/type_annotations_test.py index 3b30b5fe87..413d9e6e24 100644 --- a/sdk/python/kfp/components/types/type_annotations_test.py +++ b/sdk/python/kfp/components/types/type_annotations_test.py @@ -20,9 +20,8 @@ from absl.testing import parameterized from kfp.components.types import type_annotations from kfp.components.types.artifact_types import Model from kfp.components.types.type_annotations import (Input, InputAnnotation, - InputPath, Output, - OutputAnnotation, - OutputPath) + InputPath, Output, + OutputAnnotation, OutputPath) class AnnotationsTest(parameterized.TestCase): diff --git a/sdk/python/kfp/deprecated/_local_client.py b/sdk/python/kfp/deprecated/_local_client.py index 8f3b445ae9..da94e07ea8 100644 --- a/sdk/python/kfp/deprecated/_local_client.py +++ b/sdk/python/kfp/deprecated/_local_client.py @@ -346,13 +346,12 @@ class LocalClient: return cmd def _generate_cmd_for_docker_execution( - self, - run_name: str, - pipeline: dsl.Pipeline, - op: dsl.ContainerOp, - stack: Dict[str, Any], - docker_options: List[str] = [] - ) -> List[str]: + self, + run_name: str, + pipeline: dsl.Pipeline, + op: dsl.ContainerOp, + stack: Dict[str, Any], + docker_options: List[str] = []) -> List[str]: """Generate the command to run the op in docker locally.""" cmd = self._generate_cmd_for_subprocess_execution( run_name, pipeline, op, stack) @@ -394,8 +393,8 @@ class LocalClient: for node in group_dag.topological_sort(): subgroup = _get_subgroup(current_group.groups, node) if subgroup is not None: # Node of DAG is subgroup - success = self._run_group(run_name, pipeline, pipeline_dag, subgroup, - stack, execution_mode) + success = self._run_group(run_name, pipeline, pipeline_dag, + subgroup, stack, execution_mode) if not success: return False else: # Node of DAG is op @@ -416,7 +415,8 @@ class LocalClient: run_name, pipeline, op, stack) else: cmd = self._generate_cmd_for_docker_execution( - run_name, pipeline, op, stack, execution_mode.docker_options) + run_name, pipeline, op, stack, + execution_mode.docker_options) process = subprocess.Popen( cmd, shell=False, @@ -492,8 +492,8 @@ class LocalClient: else: raise Exception("Not implemented") else: - return self._run_group_dag(run_name, pipeline, pipeline_dag, current_group, - stack, execution_mode) + return self._run_group_dag(run_name, pipeline, pipeline_dag, + current_group, stack, execution_mode) def create_run_from_pipeline_func( self, @@ -540,7 +540,7 @@ class LocalClient: run_name = pipeline.name.replace(" ", "_").lower() + "_" + run_version pipeline_dag = self._create_op_dag(pipeline) - success = self._run_group(run_name, pipeline, pipeline_dag, pipeline.groups[0], - {}, execution_mode) + success = self._run_group(run_name, pipeline, pipeline_dag, + pipeline.groups[0], {}, execution_mode) return RunPipelineResult(self, pipeline, run_name, success=success) diff --git a/sdk/python/kfp/deprecated/cli/experiment.py b/sdk/python/kfp/deprecated/cli/experiment.py index 262a5a8ca3..0ed007d3fc 100644 --- a/sdk/python/kfp/deprecated/cli/experiment.py +++ b/sdk/python/kfp/deprecated/cli/experiment.py @@ -122,11 +122,13 @@ def _display_experiment(exp: kfp_server_api.ApiExperiment, @click.option( "--experiment-id", default=None, - help="The ID of the experiment to archive, can only supply either an experiment ID or name.") + help="The ID of the experiment to archive, can only supply either an experiment ID or name." +) @click.option( "--experiment-name", default=None, - help="The name of the experiment to archive, can only supply either an experiment ID or name.") + help="The name of the experiment to archive, can only supply either an experiment ID or name." +) @click.pass_context def archive(ctx: click.Context, experiment_id: str, experiment_name: str): """Archive an experiment""" diff --git a/sdk/python/kfp/deprecated/cli/recurring_run.py b/sdk/python/kfp/deprecated/cli/recurring_run.py index fc0f20b273..8b7370be4d 100644 --- a/sdk/python/kfp/deprecated/cli/recurring_run.py +++ b/sdk/python/kfp/deprecated/cli/recurring_run.py @@ -48,10 +48,12 @@ def recurring_run(): help='The RFC3339 time string of the time when to end the job.') @click.option( '--experiment-id', - help='The ID of the experiment to create the recurring run under, can only supply either an experiment ID or name.') + help='The ID of the experiment to create the recurring run under, can only supply either an experiment ID or name.' +) @click.option( '--experiment-name', - help='The name of the experiment to create the recurring run under, can only supply either an experiment ID or name.') + help='The name of the experiment to create the recurring run under, can only supply either an experiment ID or name.' +) @click.option('--job-name', help='The name of the recurring run.') @click.option( '--interval-second', diff --git a/sdk/python/kfp/deprecated/compiler/_data_passing_using_volume.py b/sdk/python/kfp/deprecated/compiler/_data_passing_using_volume.py index e06adab6d6..562835de40 100644 --- a/sdk/python/kfp/deprecated/compiler/_data_passing_using_volume.py +++ b/sdk/python/kfp/deprecated/compiler/_data_passing_using_volume.py @@ -129,11 +129,15 @@ def rewrite_data_passing_to_use_volumes( 'value': output_subpath, # Requires Argo 2.3.0+ }) whitelist = ['mlpipeline-ui-metadata', 'mlpipeline-metrics'] - output_artifacts = [artifact for artifact in output_artifacts if artifact['name'] in whitelist] + output_artifacts = [ + artifact for artifact in output_artifacts + if artifact['name'] in whitelist + ] if not output_artifacts: - template.get('outputs', {}).pop('artifacts', None) + template.get('outputs', {}).pop('artifacts', None) else: - template.get('outputs', {}).update({'artifacts': output_artifacts}) + template.get('outputs', + {}).update({'artifacts': output_artifacts}) # Rewrite DAG templates for template in templates: diff --git a/sdk/python/kfp/deprecated/compiler/_op_to_template.py b/sdk/python/kfp/deprecated/compiler/_op_to_template.py index db9cef22ba..8dfd7d29f1 100644 --- a/sdk/python/kfp/deprecated/compiler/_op_to_template.py +++ b/sdk/python/kfp/deprecated/compiler/_op_to_template.py @@ -319,9 +319,9 @@ def _op_to_template(op: BaseOp): and re.match('^{{inputs.parameters.*}}$', str(param)): if not 'containers' in podSpecPatch: podSpecPatch['containers'] = [{ - 'name': 'main', - 'resources': {} - }] + 'name': 'main', + 'resources': {} + }] if setting not in podSpecPatch['containers'][0][ 'resources']: podSpecPatch['containers'][0]['resources'][setting] = { diff --git a/sdk/python/kfp/deprecated/compiler/compiler.py b/sdk/python/kfp/deprecated/compiler/compiler.py index 0fcc996d93..0e184d6ea3 100644 --- a/sdk/python/kfp/deprecated/compiler/compiler.py +++ b/sdk/python/kfp/deprecated/compiler/compiler.py @@ -831,7 +831,10 @@ class Compiler(object): # set ttl after workflow finishes if pipeline_conf.ttl_seconds_after_finished >= 0: - workflow['spec']['ttlStrategy'] = {'secondsAfterCompletion': pipeline_conf.ttl_seconds_after_finished} + workflow['spec']['ttlStrategy'] = { + 'secondsAfterCompletion': + pipeline_conf.ttl_seconds_after_finished + } if pipeline_conf._pod_disruption_budget_min_available: pod_disruption_budget = { diff --git a/sdk/python/kfp/deprecated/compiler/main.py b/sdk/python/kfp/deprecated/compiler/main.py index ab20e3d7ad..1fc26984f6 100644 --- a/sdk/python/kfp/deprecated/compiler/main.py +++ b/sdk/python/kfp/deprecated/compiler/main.py @@ -82,8 +82,9 @@ def _compile_pipeline_function( else: pipeline_func = pipeline_funcs[0] - kfp.deprecated.compiler.Compiler(mode=mode).compile(pipeline_func, output_path, - type_check, pipeline_conf) + kfp.deprecated.compiler.Compiler(mode=mode).compile(pipeline_func, + output_path, type_check, + pipeline_conf) class PipelineCollectorContext(): diff --git a/sdk/python/kfp/deprecated/components/_components.py b/sdk/python/kfp/deprecated/components/_components.py index 325a8f672b..2488579a47 100644 --- a/sdk/python/kfp/deprecated/components/_components.py +++ b/sdk/python/kfp/deprecated/components/_components.py @@ -136,7 +136,7 @@ def load_component_from_spec(component_spec): if component_spec is None: raise TypeError return _create_task_factory_from_component_spec( - component_spec=component_spec) + component_spec=component_spec) def _fix_component_uri(uri: str) -> str: diff --git a/sdk/python/kfp/deprecated/components_tests/test_python_op.py b/sdk/python/kfp/deprecated/components_tests/test_python_op.py index ca56edd3c1..cd75384979 100644 --- a/sdk/python/kfp/deprecated/components_tests/test_python_op.py +++ b/sdk/python/kfp/deprecated/components_tests/test_python_op.py @@ -21,8 +21,9 @@ from pathlib import Path from typing import Callable, NamedTuple, Sequence from kfp.deprecated import components as comp -from kfp.deprecated.components import (InputBinaryFile, InputPath, InputTextFile, - OutputBinaryFile, OutputPath, OutputTextFile) +from kfp.deprecated.components import (InputBinaryFile, InputPath, + InputTextFile, OutputBinaryFile, + OutputPath, OutputTextFile) from kfp.deprecated.components._components import _resolve_command_line_and_paths from kfp.deprecated.components.structures import InputSpec, OutputSpec diff --git a/sdk/python/kfp/deprecated/dsl/_component.py b/sdk/python/kfp/deprecated/dsl/_component.py index 92d72edd4f..0098932fd6 100644 --- a/sdk/python/kfp/deprecated/dsl/_component.py +++ b/sdk/python/kfp/deprecated/dsl/_component.py @@ -18,7 +18,6 @@ from .types import check_types, InconsistentTypeException from ._ops_group import Graph import kfp.deprecated as kfp - # @deprecated( # version='0.2.6', # reason='This decorator does not seem to be used, so we deprecate it. ' diff --git a/sdk/python/kfp/deprecated/dsl/_component_bridge.py b/sdk/python/kfp/deprecated/dsl/_component_bridge.py index 2c26f551c2..2ae2b83876 100644 --- a/sdk/python/kfp/deprecated/dsl/_component_bridge.py +++ b/sdk/python/kfp/deprecated/dsl/_component_bridge.py @@ -631,7 +631,7 @@ def _attach_v2_specs( argument_is_parameter_type = type_utils.is_parameter_type(argument_type) input_is_parameter_type = type_utils.is_parameter_type(input_type) if COMPILING_FOR_V2 and (argument_is_parameter_type != - input_is_parameter_type): + input_is_parameter_type): if isinstance(argument_value, dsl.PipelineParam): param_or_value_msg = 'PipelineParam "{}"'.format( argument_value.full_name) diff --git a/sdk/python/kfp/deprecated/dsl/_container_op.py b/sdk/python/kfp/deprecated/dsl/_container_op.py index e0f39c4a86..aec00a5e1d 100644 --- a/sdk/python/kfp/deprecated/dsl/_container_op.py +++ b/sdk/python/kfp/deprecated/dsl/_container_op.py @@ -28,7 +28,6 @@ from kubernetes.client.models import (V1Container, V1ContainerPort, V1SecurityContext, V1Volume, V1VolumeDevice, V1VolumeMount) - # generics T = TypeVar('T') # type alias: either a string or a list of string @@ -1446,7 +1445,8 @@ class ContainerOp(BaseOp): is_legacy_name, normalized_name = _is_legacy_output_name( output.name) if is_legacy_name and normalized_name in self.output_artifact_paths: - output_filename = self.output_artifact_paths[normalized_name] + output_filename = self.output_artifact_paths[ + normalized_name] else: output_filename = _components._generate_output_file_name( output.name) diff --git a/sdk/python/kfp/deprecated/dsl/_volume_op.py b/sdk/python/kfp/deprecated/dsl/_volume_op.py index 31887b1a07..a69b419731 100644 --- a/sdk/python/kfp/deprecated/dsl/_volume_op.py +++ b/sdk/python/kfp/deprecated/dsl/_volume_op.py @@ -105,7 +105,8 @@ class VolumeOp(ResourceOp): if not match_serialized_pipelineparam(str(resource_name)): resource_name = sanitize_k8s_name(resource_name) pvc_metadata = V1ObjectMeta( - name="{{workflow.name}}-%s" % resource_name if generate_unique_name else resource_name, + name="{{workflow.name}}-%s" % + resource_name if generate_unique_name else resource_name, annotations=annotations) requested_resources = V1ResourceRequirements(requests={"storage": size}) pvc_spec = V1PersistentVolumeClaimSpec( diff --git a/sdk/python/tests/compiler/testdata/artifact_passing_using_volume.py b/sdk/python/tests/compiler/testdata/artifact_passing_using_volume.py index 3f25048863..6e8cf353fa 100644 --- a/sdk/python/tests/compiler/testdata/artifact_passing_using_volume.py +++ b/sdk/python/tests/compiler/testdata/artifact_passing_using_volume.py @@ -12,14 +12,18 @@ processor_op = load_component_from_file( consumer_op = load_component_from_file( str(test_data_dir / 'consume_2.component.yaml')) + def metadata_and_metrics() -> NamedTuple( "Outputs", - [("mlpipeline_ui_metadata", "UI_metadata"), ("mlpipeline_metrics", "Metrics")], + [("mlpipeline_ui_metadata", "UI_metadata"), ("mlpipeline_metrics", "Metrics" + )], ): metadata = { - "outputs": [ - {"storage": "inline", "source": "*this should be bold*", "type": "markdown"} - ] + "outputs": [{ + "storage": "inline", + "source": "*this should be bold*", + "type": "markdown" + }] } metrics = { "metrics": [ @@ -36,9 +40,10 @@ def metadata_and_metrics() -> NamedTuple( from collections import namedtuple import json - return namedtuple("output", ["mlpipeline_ui_metadata", "mlpipeline_metrics"])( - json.dumps(metadata), json.dumps(metrics) - ) + return namedtuple("output", + ["mlpipeline_ui_metadata", "mlpipeline_metrics"])( + json.dumps(metadata), json.dumps(metrics)) + @kfp.dsl.pipeline() def artifact_passing_pipeline(): diff --git a/sdk/python/tests/dsl/component_tests.py b/sdk/python/tests/dsl/component_tests.py index 0ee0c85f94..669d67e74f 100644 --- a/sdk/python/tests/dsl/component_tests.py +++ b/sdk/python/tests/dsl/component_tests.py @@ -23,6 +23,7 @@ from kfp.deprecated.dsl.types import Integer, GCSPath, InconsistentTypeException from kfp.deprecated.dsl import ContainerOp, Pipeline, PipelineParam from kfp.deprecated.components.structures import ComponentSpec, InputSpec, OutputSpec + @unittest.skip("deprecated") class TestGraphComponent(unittest.TestCase): diff --git a/sdk/python/tests/local_runner_test.py b/sdk/python/tests/local_runner_test.py index 181da75054..a2456cb162 100644 --- a/sdk/python/tests/local_runner_test.py +++ b/sdk/python/tests/local_runner_test.py @@ -228,14 +228,12 @@ class LocalRunnerTest(unittest.TestCase): check_option() run_result = run_pipeline_func_locally( - _pipeline, - {}, - execution_mode=LocalClient.ExecutionMode(mode="docker", - docker_options=["-e", "foo=bar"]) - ) + _pipeline, {}, + execution_mode=LocalClient.ExecutionMode( + mode="docker", docker_options=["-e", "foo=bar"])) assert run_result.success output_file_path = run_result.get_output_file("check-option") with open(output_file_path, "r") as f: line = f.readline() - assert "bar" in line \ No newline at end of file + assert "bar" in line