feat(sdk.v2): remove pipeline_root from compiler interface (#5492)

This commit is contained in:
Chen Sun 2021-04-16 08:49:13 -07:00 committed by GitHub
parent 5c16e78d5f
commit 5f97a4502d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 37 additions and 92 deletions

View File

@ -1008,7 +1008,7 @@ class Compiler(object):
pull secrets and other pipeline-level configuration options. Overrides
any configuration that may be set by the pipeline.
"""
pipeline_root_dir = getattr(pipeline_func, 'output_directory', None)
pipeline_root_dir = getattr(pipeline_func, 'pipeline_root', None)
if (pipeline_root_dir is not None or
self._mode == dsl.PipelineExecutionMode.V2_COMPATIBLE):
self._pipeline_root_param = dsl.PipelineParam(

View File

@ -72,7 +72,7 @@ def pipeline(
if description:
func._component_description = description
if pipeline_root:
func.output_directory = pipeline_root
func.pipeline_root = pipeline_root
if _pipeline_decorator_handler:
return _pipeline_decorator_handler(func) or func

View File

@ -1112,7 +1112,6 @@ class Compiler(object):
def _create_pipeline_v2(
self,
pipeline_func: Callable[..., Any],
pipeline_root: Optional[str] = None,
pipeline_name: Optional[str] = None,
pipeline_parameters_override: Optional[Mapping[str, Any]] = None,
) -> pipeline_spec_pb2.PipelineJob:
@ -1120,7 +1119,6 @@ class Compiler(object):
Args:
pipeline_func: Pipeline function with @dsl.pipeline decorator.
pipeline_root: The root of the pipeline outputs. Optional.
pipeline_name: The name of the pipeline. Optional.
pipeline_parameters_override: The mapping from parameter names to values.
Optional.
@ -1134,8 +1132,8 @@ class Compiler(object):
pipeline_meta = _python_op._extract_component_interface(pipeline_func)
pipeline_name = pipeline_name or pipeline_meta.name
pipeline_root = pipeline_root or getattr(pipeline_func, 'output_directory',
None)
pipeline_root = getattr(pipeline_func, 'pipeline_root', None)
if not pipeline_root:
warnings.warn('pipeline_root is None or empty. A valid pipeline_root '
'must be provided at job submission.')
@ -1198,7 +1196,6 @@ class Compiler(object):
def compile(self,
pipeline_func: Callable[..., Any],
package_path: str,
pipeline_root: Optional[str] = None,
pipeline_name: Optional[str] = None,
pipeline_parameters: Optional[Mapping[str, Any]] = None,
type_check: bool = True) -> None:
@ -1208,10 +1205,6 @@ class Compiler(object):
pipeline_func: Pipeline function with @dsl.pipeline decorator.
package_path: The output pipeline job .json file path. for example,
"~/pipeline_job.json"
pipeline_root: The root of the pipeline outputs. Optional. The
pipeline_root value can be specified either from this `compile()` method
or through the `@dsl.pipeline` decorator. If it's specified in both
places, the value provided here prevails.
pipeline_name: The name of the pipeline. Optional.
pipeline_parameters: The mapping from parameter names to values. Optional.
type_check: Whether to enable the type check or not, default: True.
@ -1221,7 +1214,6 @@ class Compiler(object):
kfp.TYPE_CHECK = type_check
pipeline_job = self._create_pipeline_v2(
pipeline_func=pipeline_func,
pipeline_root=pipeline_root,
pipeline_name=pipeline_name,
pipeline_parameters_override=pipeline_parameters)
self._write_pipeline(pipeline_job, package_path)

View File

@ -58,7 +58,7 @@ class CompilerTest(unittest.TestCase):
- {inputValue: input_value}
""")
@dsl.pipeline(name='two-step-pipeline')
@dsl.pipeline(name='test-pipeline', pipeline_root='dummy_root')
def simple_pipeline(pipeline_input:str='Hello KFP!'):
producer = producer_op(input_param=pipeline_input)
consumer = consumer_op(
@ -68,7 +68,6 @@ class CompilerTest(unittest.TestCase):
target_json_file = os.path.join(tmpdir, 'result.json')
compiler.Compiler().compile(
pipeline_func=simple_pipeline,
pipeline_root='dummy_root',
package_path=target_json_file)
self.assertTrue(os.path.exists(target_json_file))
@ -102,7 +101,7 @@ class CompilerTest(unittest.TestCase):
- {inputValue: msg}
""")
@dsl.pipeline(name='pipeline-with-exit-handler')
@dsl.pipeline(name='test-pipeline', pipeline_root='dummy_root')
def download_and_print(url='gs://ml-pipeline/shakespeare/shakespeare1.txt'):
"""A sample pipeline showing exit handler."""
@ -117,7 +116,6 @@ class CompilerTest(unittest.TestCase):
'dsl.ExitHandler is not yet supported in KFP v2 compiler.'):
compiler.Compiler().compile(
pipeline_func=download_and_print,
pipeline_root='dummy_root',
package_path='output.json')
def test_compile_pipeline_with_dsl_graph_component_should_raise_error(self):
@ -142,7 +140,7 @@ class CompilerTest(unittest.TestCase):
command=['sh', '-c'],
arguments=['echo "$0"', text2])
@dsl.pipeline(name='pipeline-with-graph-component')
@dsl.pipeline(name='test-pipeline', pipeline_root='dummy_root')
def opsgroups_pipeline(text1='message 1', text2='message 2'):
step1_graph_component = echo1_graph_component(text1)
step2_graph_component = echo2_graph_component(text2)
@ -150,7 +148,6 @@ class CompilerTest(unittest.TestCase):
compiler.Compiler().compile(
pipeline_func=opsgroups_pipeline,
pipeline_root='dummy_root',
package_path='output.json')
def test_compile_pipeline_with_misused_inputvalue_should_raise_error(self):
@ -166,6 +163,7 @@ class CompilerTest(unittest.TestCase):
- {inputValue: model}
""")
@dsl.pipeline(name='test-pipeline', pipeline_root='dummy_root')
def my_pipeline(model):
component_op(model=model)
@ -174,7 +172,6 @@ class CompilerTest(unittest.TestCase):
' type "Model" cannot be paired with InputValuePlaceholder.'):
compiler.Compiler().compile(
pipeline_func=my_pipeline,
pipeline_root='dummy',
package_path='output.json')
def test_compile_pipeline_with_misused_inputpath_should_raise_error(self):
@ -190,6 +187,7 @@ class CompilerTest(unittest.TestCase):
- {inputPath: text}
""")
@dsl.pipeline(name='test-pipeline', pipeline_root='dummy_root')
def my_pipeline(text):
component_op(text=text)
@ -198,7 +196,6 @@ class CompilerTest(unittest.TestCase):
' type "String" cannot be paired with InputPathPlaceholder.'):
compiler.Compiler().compile(
pipeline_func=my_pipeline,
pipeline_root='dummy',
package_path='output.json')
def test_compile_pipeline_with_misused_inputuri_should_raise_error(self):
@ -214,6 +211,7 @@ class CompilerTest(unittest.TestCase):
- {inputUri: value}
""")
@dsl.pipeline(name='test-pipeline', pipeline_root='dummy_root')
def my_pipeline(value):
component_op(value=value)
@ -221,7 +219,6 @@ class CompilerTest(unittest.TestCase):
TypeError, ' type "Float" cannot be paired with InputUriPlaceholder.'):
compiler.Compiler().compile(
pipeline_func=my_pipeline,
pipeline_root='dummy',
package_path='output.json')
def test_compile_pipeline_with_misused_outputuri_should_raise_error(self):
@ -237,6 +234,7 @@ class CompilerTest(unittest.TestCase):
- {outputUri: value}
""")
@dsl.pipeline(name='test-pipeline', pipeline_root='dummy_root')
def my_pipeline():
component_op()
@ -245,7 +243,6 @@ class CompilerTest(unittest.TestCase):
' type "Integer" cannot be paired with OutputUriPlaceholder.'):
compiler.Compiler().compile(
pipeline_func=my_pipeline,
pipeline_root='dummy',
package_path='output.json')
def test_compile_pipeline_with_invalid_name_should_raise_error(self):
@ -259,7 +256,6 @@ class CompilerTest(unittest.TestCase):
):
compiler.Compiler().compile(
pipeline_func=my_pipeline,
pipeline_root='dummy',
package_path='output.json')
def test_compile_pipeline_with_importer_on_inputpath_should_raise_error(self):
@ -276,7 +272,7 @@ class CompilerTest(unittest.TestCase):
- {inputPath: model}
""")
@dsl.pipeline(name='my-component')
@dsl.pipeline(name='test-pipeline', pipeline_root='dummy_root')
def my_pipeline(model):
component_op(model=model)
@ -286,7 +282,6 @@ class CompilerTest(unittest.TestCase):
'output. However it is used with InputPathPlaceholder.'):
compiler.Compiler().compile(
pipeline_func=my_pipeline,
pipeline_root='dummy',
package_path='output.json')
# Python function based component authoring
@ -295,7 +290,7 @@ class CompilerTest(unittest.TestCase):
component_op = components.create_component_from_func(my_component)
@dsl.pipeline(name='my-component')
@dsl.pipeline(name='test-pipeline', pipeline_root='dummy_root')
def my_pipeline(datasets):
component_op(datasets=datasets)
@ -305,7 +300,6 @@ class CompilerTest(unittest.TestCase):
'output. However it is used with InputPathPlaceholder.'):
compiler.Compiler().compile(
pipeline_func=my_pipeline,
pipeline_root='dummy',
package_path='output.json')
def test_set_pipeline_root_through_pipeline_decorator(self):
@ -313,7 +307,7 @@ class CompilerTest(unittest.TestCase):
tmpdir = tempfile.mkdtemp()
try:
@dsl.pipeline(name='my-pipeline', pipeline_root='gs://path')
@dsl.pipeline(name='test-pipeline', pipeline_root='gs://path')
def my_pipeline():
pass
@ -329,42 +323,20 @@ class CompilerTest(unittest.TestCase):
finally:
shutil.rmtree(tmpdir)
def test_set_pipeline_root_through_compile_method(self):
tmpdir = tempfile.mkdtemp()
try:
@dsl.pipeline(name='my-pipeline', pipeline_root='gs://path')
def my_pipeline():
pass
target_json_file = os.path.join(tmpdir, 'result.json')
compiler.Compiler().compile(
pipeline_func=my_pipeline,
pipeline_root='gs://path-override',
package_path=target_json_file)
self.assertTrue(os.path.exists(target_json_file))
with open(target_json_file) as f:
job_spec = json.load(f)
self.assertEqual('gs://path-override',
job_spec['runtimeConfig']['gcsOutputDirectory'])
finally:
shutil.rmtree(tmpdir)
def test_missing_pipeline_root_is_allowed_but_warned(self):
tmpdir = tempfile.mkdtemp()
try:
@dsl.pipeline(name='my-pipeline')
@dsl.pipeline(name='test-pipeline')
def my_pipeline():
pass
target_json_file = os.path.join(tmpdir, 'result.json')
with self.assertWarnsRegex(UserWarning, 'pipeline_root is None or empty'):
compiler.Compiler().compile(
pipeline_func=my_pipeline, package_path=target_json_file)
pipeline_func=my_pipeline,
package_path=target_json_file)
self.assertTrue(os.path.exists(target_json_file))
with open(target_json_file) as f:

View File

@ -33,10 +33,6 @@ def parse_arguments() -> argparse.Namespace:
'--function',
type=str,
help='The name of the function to compile if there are multiple.')
parser.add_argument(
'--pipeline-root',
type=str,
help='The root of the pipeline outputs.')
parser.add_argument(
'--pipeline-parameters',
type=json.loads,
@ -58,7 +54,7 @@ def parse_arguments() -> argparse.Namespace:
def _compile_pipeline_function(pipeline_funcs: List[Callable],
function_name: Optional[str], pipeline_root: str,
function_name: Optional[str],
pipeline_parameters: Optional[Mapping[str, Any]],
package_path: str, type_check: bool) -> None:
"""Compiles a pipeline function.
@ -67,7 +63,6 @@ def _compile_pipeline_function(pipeline_funcs: List[Callable],
pipeline_funcs: A list of pipeline_functions.
function_name: The name of the pipeline function to compile if there were
multiple.
pipeline_root: The root output directory for pipeline runtime.
pipeline_parameters: The pipeline parameters as a dict of {name: value}.
package_path: The output path of the compiled result.
type_check: Whether to enable the type checking.
@ -94,7 +89,6 @@ def _compile_pipeline_function(pipeline_funcs: List[Callable],
compiler.Compiler().compile(
pipeline_func=pipeline_func,
pipeline_root=pipeline_root,
pipeline_parameters=pipeline_parameters,
package_path=package_path,
type_check=type_check)
@ -118,7 +112,6 @@ class PipelineCollectorContext():
def compile_pyfile(pyfile: str, function_name: Optional[str],
pipeline_root: str,
pipeline_parameters: Optional[Mapping[str, Any]],
package_path: str, type_check: bool) -> None:
"""Compiles a pipeline written in a .py file.
@ -126,7 +119,6 @@ def compile_pyfile(pyfile: str, function_name: Optional[str],
Args:
pyfile: The path to the .py file that contains the pipeline definition.
function_name: The name of the pipeline function.
pipeline_root: The root output directory for pipeline runtime.
pipeline_parameters: The pipeline parameters as a dict of {name: value}.
package_path: The output path of the compiled result.
type_check: Whether to enable the type checking.
@ -139,7 +131,6 @@ def compile_pyfile(pyfile: str, function_name: Optional[str],
_compile_pipeline_function(
pipeline_funcs=pipeline_funcs,
function_name=function_name,
pipeline_root=pipeline_root,
pipeline_parameters=pipeline_parameters,
package_path=package_path,
type_check=type_check)
@ -154,7 +145,6 @@ def main():
compile_pyfile(
pyfile=args.py,
function_name=args.function,
pipeline_root=args.pipeline_root,
pipeline_parameters=args.pipeline_parameters,
package_path=args.output,
type_check=not args.disable_type_check,

View File

@ -38,8 +38,7 @@ class CompilerCliTests(unittest.TestCase):
else:
target_json = os.path.join(tmpdir, file_base_name + '-pipeline.json')
subprocess.check_call([
'dsl-compile-v2', '--py', py_file, '--pipeline-root', 'dummy_root',
'--output', target_json
'dsl-compile-v2', '--py', py_file, '--output', target_json
] + additional_arguments)
with open(golden_compiled_file, 'r') as f:

View File

@ -97,5 +97,4 @@ def pipeline(message: str):
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=pipeline,
pipeline_root='dummy_root',
package_path=__file__.replace('.py', '.json'))

View File

@ -78,5 +78,4 @@ def pipeline(first_message: str, second_message: str, first_number: int,
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=pipeline,
pipeline_root='dummy_root',
package_path=__file__.replace('.py', '.json'))

View File

@ -33,7 +33,7 @@ implementation:
""")
@dsl.pipeline(name='pipeline-with-after')
@dsl.pipeline(name='pipeline-with-after', pipeline_root='dummy_root')
def my_pipeline():
task1 = component_op(text='1st task')
task2 = component_op(text='2nd task').after(task1)
@ -43,5 +43,4 @@ def my_pipeline():
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=my_pipeline,
pipeline_root='dummy_root',
package_path=__file__.replace('.py', '.json'))

View File

@ -23,7 +23,8 @@ component_op = components.load_component_from_file(
str(test_data_dir / 'concat_placeholder_component.yaml'))
@dsl.pipeline(name='one-step-pipeline-with-concat-placeholder')
@dsl.pipeline(name='one-step-pipeline-with-concat-placeholder',
pipeline_root='dummy_root')
def my_pipeline():
component = component_op(input_prefix='some prefix:')
@ -31,5 +32,4 @@ def my_pipeline():
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=my_pipeline,
pipeline_root='dummy_root',
package_path=__file__.replace('.py', '.json'))

View File

@ -34,7 +34,7 @@ flip_coin_op = components.create_component_from_func(flip_coin)
print_op = components.create_component_from_func(print_msg)
@dsl.pipeline(name='single-condition-pipeline')
@dsl.pipeline(name='single-condition-pipeline', pipeline_root='dummy_root')
def my_pipeline(text: str = 'condition test'):
flip1 = flip_coin_op()
print_op(flip1.output)

View File

@ -22,7 +22,7 @@ def print_op(text: str):
print(text)
@dsl.pipeline(name='pipeline-with-custom-job-spec')
@dsl.pipeline(name='pipeline-with-custom-job-spec', pipeline_root='dummy_root')
def my_pipeline():
# Normal container execution.
@ -58,5 +58,4 @@ def my_pipeline():
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=my_pipeline,
pipeline_root='dummy_root',
package_path=__file__.replace('.py', '.json'))

View File

@ -23,7 +23,8 @@ component_op = components.load_component_from_file(
str(test_data_dir / 'if_placeholder_component.yaml'))
@dsl.pipeline(name='one-step-pipeline-with-if-placeholder')
@dsl.pipeline(name='one-step-pipeline-with-if-placeholder',
pipeline_root='dummy_root')
def my_pipeline(input0: str, input1: str, input2: str):
# supply only optional_input_1 but not optional_input_2
component = component_op(required_input=input0, optional_input_1=input1)
@ -32,5 +33,4 @@ def my_pipeline(input0: str, input1: str, input2: str):
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=my_pipeline,
pipeline_root='dummy_root',
package_path=__file__.replace('.py', '.json'))

View File

@ -34,7 +34,7 @@ flip_coin_op = components.create_component_from_func(flip_coin)
print_op = components.create_component_from_func(print_msg)
@dsl.pipeline(name='nested-conditions-pipeline')
@dsl.pipeline(name='nested-conditions-pipeline', pipeline_root='dummy_root')
def my_pipeline():
flip1 = flip_coin_op()
print_op(flip1.output)
@ -52,5 +52,4 @@ def my_pipeline():
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=my_pipeline,
pipeline_root='dummy_root',
package_path=__file__.replace('.py', '.json'))

View File

@ -67,6 +67,7 @@ print_op = components.load_component_from_text("""
@dsl.pipeline(
name='conditional-execution-pipeline',
pipeline_root='dummy_root',
description='Shows how to use dsl.Condition().')
def my_pipeline():
flip = flip_coin_op()
@ -88,5 +89,4 @@ def my_pipeline():
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=my_pipeline,
pipeline_root='dummy_root',
package_path=__file__.replace('.py', '.json'))

View File

@ -28,6 +28,7 @@ training_op = components.load_component_from_file(
@dsl.pipeline(
name='two-step-pipeline-with-ontology',
pipeline_root='dummy_root',
description='A linear two-step pipeline with artifact ontology types.')
def my_pipeline(input_location: str = 'gs://test-bucket/pipeline_root',
optimizer: str = 'sgd',
@ -42,5 +43,4 @@ def my_pipeline(input_location: str = 'gs://test-bucket/pipeline_root',
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=my_pipeline,
pipeline_root='dummy_root',
package_path=__file__.replace('.py', '.json'))

View File

@ -23,7 +23,8 @@ def print_op(name: str) -> str:
return name
@dsl.pipeline(name='pipeline-with-pipelineparam-containing-format')
@dsl.pipeline(name='pipeline-with-pipelineparam-containing-format',
pipeline_root='dummy_root')
def my_pipeline(name: str = 'KFP'):
print_task = print_op('Hello {}'.format(name))
print_op('{}, again.'.format(print_task.output))
@ -32,5 +33,4 @@ def my_pipeline(name: str = 'KFP'):
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=my_pipeline,
pipeline_root='dummy_root',
package_path=__file__.replace('.py', '.json'))

View File

@ -28,6 +28,7 @@ training_op = components.load_component_from_file(
@dsl.pipeline(
name='two-step-pipeline-with-resource-spec',
pipeline_root='dummy_root',
description='A linear two-step pipeline with resource specification.')
def my_pipeline(input_location: str = 'gs://test-bucket/pipeline_root',
optimizer: str = 'sgd',

View File

@ -23,7 +23,7 @@ add_op = components.load_component_from_file(
str(test_data_dir / 'add_component.yaml'))
@dsl.pipeline(name='add-pipeline')
@dsl.pipeline(name='add-pipeline', pipeline_root='dummy_root')
def my_pipeline(
a: int = 2,
b: int = 5,
@ -36,5 +36,4 @@ def my_pipeline(
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=my_pipeline,
pipeline_root='dummy_root',
package_path=__file__.replace('.py', '.json'))

View File

@ -66,7 +66,7 @@ implementation:
""")
@dsl.pipeline(name='pipeline-with-various-types')
@dsl.pipeline(name='pipeline-with-various-types', pipeline_root='dummy_root')
def my_pipeline(input1: str,
input3,
input4='',
@ -92,5 +92,4 @@ def my_pipeline(input1: str,
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=my_pipeline,
pipeline_root='dummy_root',
package_path=__file__.replace('.py', '.json'))

View File

@ -54,7 +54,7 @@ implementation:
""")
@dsl.pipeline(name='simple-two-step-pipeline')
@dsl.pipeline(name='simple-two-step-pipeline', pipeline_root='dummy_root')
def my_pipeline(text: str = 'Hello world!'):
component_1 = component_op_1(text=text)
component_2 = component_op_2(
@ -64,6 +64,5 @@ def my_pipeline(text: str = 'Hello world!'):
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=my_pipeline,
pipeline_root='dummy_root',
pipeline_parameters={'text': 'Hello KFP!'},
package_path=__file__.replace('.py', '.json'))

View File

@ -27,6 +27,7 @@ serving_op = components.load_component_from_file(
@dsl.pipeline(
name='two-step-pipeline-with-importer',
pipeline_root='dummy_root',
description='A linear two-step pipeline.')
def my_pipeline(input_gcs = 'gs://test-bucket/pipeline_root',
optimizer: str = 'sgd',
@ -41,5 +42,4 @@ def my_pipeline(input_gcs = 'gs://test-bucket/pipeline_root',
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=my_pipeline,
pipeline_root='dummy_root',
package_path=__file__.replace('.py', '.json'))

View File

@ -36,7 +36,7 @@ xgboost_predict_on_parquet_op = components.load_component_from_url(
)
@dsl.pipeline(name='xgboost-sample-pipeline')
@dsl.pipeline(name='xgboost-sample-pipeline', pipeline_root='dummy_root')
def xgboost_pipeline():
training_data_csv = chicago_taxi_dataset_op(
where='trip_start_timestamp >= "2019-01-01" AND trip_start_timestamp < "2019-02-01"',
@ -92,5 +92,4 @@ def xgboost_pipeline():
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=xgboost_pipeline,
pipeline_root='dummy_root',
package_path=__file__.replace('.py', '.json'))