test(v2): v2 sample test supports notebooks (#7109)

* test(v2): v2 sample test supports notebooks

* fix

* fix relative path problem
This commit is contained in:
Yuan (Bob) Gong 2021-12-24 10:37:42 +08:00 committed by GitHub
parent 073539a50a
commit 44228c242b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 158 additions and 87 deletions

View File

@ -26,11 +26,13 @@
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true
"tags": [
"skip-in-test"
]
},
"outputs": [],
"source": [
"!python3 -m pip install 'kfp>=0.1.31' --quiet\n"
"!python3 -m pip install 'kfp>=0.1.31' --quiet"
]
},
{
@ -40,32 +42,6 @@
"## Setup project info and imports"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": [
"parameters"
]
},
"outputs": [],
"source": [
"output = 'gs://[BUCKET-NAME]' # GCS bucket name\n",
"project_id = '[PROJECT-NAME]' # GCP project name\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"STAGING_GCS_PATH = os.path.join(output, 'multiple-output-sample')\n",
"TARGET_IMAGE = 'gcr.io/%s/multi-output:latest' % project_id\n",
"BASE_IMAGE = 'tensorflow/tensorflow:1.11.0-py3'"
]
},
{
"cell_type": "code",
"execution_count": null,
@ -73,8 +49,8 @@
"outputs": [],
"source": [
"import kfp \n",
"import kfp.components as components\n",
"import kfp.dsl as dsl\n",
"from kfp import compiler\n",
"from typing import NamedTuple"
]
},
@ -92,6 +68,7 @@
"metadata": {},
"outputs": [],
"source": [
"@components.create_component_from_func\n",
"def product_sum(a: float, b: float) -> NamedTuple(\n",
" 'output', [('product', float), ('sum', float)]):\n",
" '''Returns the product and sum of two numbers'''\n",
@ -101,21 +78,6 @@
" return product_sum_output(a*b, a+b)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"product_sum_op = compiler.build_python_component(\n",
" component_func=product_sum,\n",
" staging_gcs_path=STAGING_GCS_PATH,\n",
" base_image=BASE_IMAGE,\n",
" target_image=TARGET_IMAGE)"
]
},
{
"cell_type": "markdown",
"metadata": {},
@ -136,10 +98,10 @@
" description='Sample pipeline to showcase multiple outputs'\n",
")\n",
"def pipeline(a=2.0, b=2.5, c=3.0):\n",
" prod_sum_task = product_sum_op(a, b)\n",
" prod_sum_task2 = product_sum_op(b, c)\n",
" prod_sum_task3 = product_sum_op(prod_sum_task.outputs['product'],\n",
" prod_sum_task2.outputs['sum'])"
" prod_sum_task = product_sum(a, b)\n",
" prod_sum_task2 = product_sum(b, c)\n",
" prod_sum_task3 = product_sum(prod_sum_task.outputs['product'],\n",
" prod_sum_task2.outputs['sum'])"
]
},
{
@ -152,7 +114,11 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"tags": [
"skip-in-test"
]
},
"outputs": [],
"source": [
"arguments = {\n",
@ -166,9 +132,11 @@
],
"metadata": {
"celltoolbar": "Tags",
"interpreter": {
"hash": "c7a91a0fef823c7f839350126c5e355ea393d05f89cb40a046ebac9c8851a521"
},
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"display_name": "Python 3.7.10 64-bit ('v2': conda)",
"name": "python3"
},
"language_info": {
@ -181,7 +149,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.4"
"version": "3.7.10"
}
},
"nbformat": 4,

View File

@ -0,0 +1,23 @@
# Copyright 2021 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import kfp
from kfp.samples.test.utils import TestCase, relative_path, run_pipeline_func
run_pipeline_func([
TestCase(
pipeline_file=relative_path(__file__, 'multiple_outputs.ipynb'),
mode=kfp.dsl.PipelineExecutionMode.V1_LEGACY,
),
])

View File

@ -46,6 +46,8 @@
path: samples.core.XGBoost.xgboost_sample_test
- name: use_run_id
path: samples.core.use_run_info.use_run_id_test
- name: multiple_outputs
path: samples.core.multiple_outputs.multiple_outputs_test
- name: fail
path: samples.test.fail_test

View File

@ -19,11 +19,15 @@ import logging
import os
import time
import random
import tempfile
import subprocess
from dataclasses import dataclass, asdict
from pprint import pprint
from typing import Callable, Optional
import unittest
from google.protobuf.json_format import MessageToDict
import nbformat
from nbconvert import PythonExporter
import kfp
from kfp.onprem import add_default_resource_spec
@ -61,8 +65,9 @@ Verifier = Callable[[
@dataclass
class TestCase:
"""Test case for running a KFP sample."""
pipeline_func: Callable
"""Test case for running a KFP sample. One of pipeline_func or pipeline_file is required."""
pipeline_func: Optional[Callable] = None
pipeline_file: Optional[str] = None
mode: kfp.dsl.PipelineExecutionMode = kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE
enable_caching: bool = False
arguments: Optional[dict[str, str]] = None
@ -81,23 +86,38 @@ def run_pipeline_func(test_cases: list[TestCase]):
def test_wrapper(
run_pipeline: Callable[
[Callable, kfp.dsl.PipelineExecutionMode, bool, dict],
[Callable, str, kfp.dsl.PipelineExecutionMode, bool, dict],
kfp_server_api.ApiRunDetail],
mlmd_connection_config: metadata_store_pb2.MetadataStoreClientConfig,
):
for case in test_cases:
pipeline_name = None
if (not case.pipeline_file) and (not case.pipeline_func):
raise ValueError(
'TestCase must have exactly one of pipeline_file or pipeline_func specified, got none.'
)
if case.pipeline_file and case.pipeline_func:
raise ValueError(
'TestCase must have exactly one of pipeline_file or pipeline_func specified, got both.'
)
if case.pipeline_func:
pipeline_name = case.pipeline_func._component_human_name
else:
pipeline_name = case.pipeline_file
if case.mode == kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE:
print('Unexpected v2 compatible mode test for: {}'.format(
case.pipeline_func._component_human_name))
print(
f'Unexpected v2 compatible mode test for: {pipeline_name}')
raise RuntimeError
if case.mode == kfp.dsl.PipelineExecutionMode.V2_ENGINE:
print('Running v2 engine mode test for: {}'.format(
case.pipeline_func._component_human_name))
print(f'Running v2 engine mode test for: {pipeline_name}')
if case.mode == kfp.dsl.PipelineExecutionMode.V1_LEGACY:
print(f'Running v1 legacy test for: {pipeline_name}')
run_detail = run_pipeline(
pipeline_func=case.pipeline_func,
pipeline_file=case.pipeline_file,
mode=case.mode,
enable_caching=case.enable_caching,
arguments=case.arguments or {})
@ -226,7 +246,8 @@ def _run_test(callback):
client = kfp.Client(host=host)
def run_pipeline(
pipeline_func: Callable,
pipeline_func: Optional[Callable],
pipeline_file: Optional[str],
mode: kfp.dsl.PipelineExecutionMode = kfp.dsl.PipelineExecutionMode
.V2_ENGINE,
enable_caching: bool = False,
@ -239,6 +260,7 @@ def _run_test(callback):
return run_v2_pipeline(
client=client,
fn=pipeline_func,
file=pipeline_file,
driver_image=driver_image,
launcher_v2_image=launcher_v2_image,
pipeline_root=pipeline_root,
@ -257,16 +279,35 @@ def _run_test(callback):
memory_limit='512Mi',
))
if mode == kfp.dsl.PipelineExecutionMode.V1_LEGACY:
conf.add_op_transformer(disable_cache)
return client.create_run_from_pipeline_func(
pipeline_func,
pipeline_conf=conf,
mode=mode,
arguments=arguments,
experiment_name=experiment,
# This parameter only works for v2 compatible mode and v2 mode, it does not affect v1 mode
enable_caching=enable_caching,
)
conf.add_op_transformer(_disable_cache)
if pipeline_func:
return client.create_run_from_pipeline_func(
pipeline_func,
pipeline_conf=conf,
mode=mode,
arguments=arguments,
experiment_name=experiment,
)
else:
pyfile = pipeline_file
if pipeline_file.endswith(".ipynb"):
pyfile = tempfile.mktemp(
suffix='.py', prefix="pipeline_py_code")
_nb_sample_to_py(pipeline_file, pyfile)
from kfp.compiler.main import compile_pyfile
package_path = tempfile.mktemp(
suffix='.yaml', prefix="kfp_package")
compile_pyfile(
pyfile=pyfile,
output_path=package_path,
mode=mode,
pipeline_conf=conf,
)
return client.create_run_from_pipeline_package(
pipeline_file=package_path,
arguments=arguments,
experiment_name=experiment,
)
run_result = _retry_with_backoff(fn=_create_run)
print("Run details page URL:")
@ -308,19 +349,26 @@ def _run_test(callback):
def run_v2_pipeline(
client: kfp.Client,
fn: Callable,
fn: Optional[Callable],
file: Optional[str],
driver_image: Optional[str],
launcher_v2_image: Optional[str],
pipeline_root: Optional[str],
enable_caching: bool,
arguments: dict[str, str],
):
import tempfile
import subprocess
original_pipeline_spec = tempfile.mktemp(
suffix='.json', prefix="original_pipeline_spec")
kfp.v2.compiler.Compiler().compile(
pipeline_func=fn, package_path=original_pipeline_spec)
if fn:
kfp.v2.compiler.Compiler().compile(
pipeline_func=fn, package_path=original_pipeline_spec)
else:
pyfile = file
if file.endswith(".ipynb"):
pyfile = tempfile.mktemp(suffix='.py', prefix="pipeline_py_code")
_nb_sample_to_py(file, pyfile)
from kfp.v2.compiler.main import compile_pyfile
compile_pyfile(pyfile=pyfile, package_path=original_pipeline_spec)
# remove following overriding logic once we use create_run_from_job_spec to trigger kfp pipeline run
with open(original_pipeline_spec) as f:
@ -635,9 +683,30 @@ def _parse_parameters(execution: metadata_store_pb2.Execution) -> dict:
return parameters
def disable_cache(task):
def _disable_cache(task):
# Skip tasks which are not container ops.
if not isinstance(task, kfp.dsl.ContainerOp):
return task
task.execution_options.caching_strategy.max_cache_staleness = "P0D"
return task
def _nb_sample_to_py(notebook_path: str, output_path: str):
"""nb_sample_to_py converts notebook kfp sample to a python file. Cells with tag "skip-in-test" will be omitted."""
with open(notebook_path, 'r') as f:
nb = nbformat.read(f, as_version=4)
# Cells with skip-in-test tag will be omitted.
# Example code that needs the tag:
# kfp.Client().create_run_from_pipeline_func()
# so that we won't submit pipelines when compiling them.
nb.cells = [
cell for cell in nb.cells
if 'skip-in-test' not in cell.get('metadata', {}).get('tags', [])
]
py_exporter = PythonExporter()
(py_code, res) = py_exporter.from_notebook_node(nb)
with open(output_path, 'w') as out:
out.write(py_code)
def relative_path(file_path: str, relative_path: str) -> str:
return os.path.join(os.path.dirname(os.path.realpath(file_path)), relative_path)

View File

@ -27,6 +27,7 @@ setuptools.setup(
install_requires=[ # The exact versions should be determined elsewhere.
'kfp',
'ml-metadata',
'nbconvert~=6.0', # >=6.0 <7
],
classifiers=[
"Programming Language :: Python :: 3",

View File

@ -54,9 +54,13 @@ def parse_arguments():
return args
def _compile_pipeline_function(pipeline_funcs, function_name, output_path,
type_check,
mode: Optional[dsl.PipelineExecutionMode]):
def _compile_pipeline_function(
pipeline_funcs,
function_name,
output_path,
type_check,
mode: Optional[dsl.PipelineExecutionMode] = None,
pipeline_conf: Optional[dsl.PipelineConf] = None):
if len(pipeline_funcs) == 0:
raise ValueError(
'A function with @dsl.pipeline decorator is required in the py file.'
@ -79,7 +83,7 @@ def _compile_pipeline_function(pipeline_funcs, function_name, output_path,
pipeline_func = pipeline_funcs[0]
kfp.compiler.Compiler(mode=mode).compile(pipeline_func, output_path,
type_check)
type_check, pipeline_conf)
class PipelineCollectorContext():
@ -99,15 +103,19 @@ class PipelineCollectorContext():
dsl._pipeline._pipeline_decorator_handler = self.old_handler
def compile_pyfile(pyfile, function_name, output_path, type_check,
mode: Optional[dsl.PipelineExecutionMode]):
def compile_pyfile(pyfile,
output_path,
function_name=None,
type_check=True,
mode: Optional[dsl.PipelineExecutionMode] = None,
pipeline_conf: Optional[dsl.PipelineConf] = None):
sys.path.insert(0, os.path.dirname(pyfile))
try:
filename = os.path.basename(pyfile)
with PipelineCollectorContext() as pipeline_funcs:
__import__(os.path.splitext(filename)[0])
_compile_pipeline_function(pipeline_funcs, function_name, output_path,
type_check, mode)
type_check, mode, pipeline_conf)
finally:
del sys.path[0]
@ -132,8 +140,8 @@ def main():
)
compile_pyfile(
args.py,
args.function,
args.output,
args.function,
not args.disable_type_check,
mode,
)

View File

@ -122,10 +122,10 @@ class PipelineCollectorContext():
def compile_pyfile(
pyfile: str,
function_name: Optional[str],
pipeline_parameters: Optional[Mapping[str, Any]],
package_path: str,
type_check: bool,
function_name: Optional[str] = None,
pipeline_parameters: Optional[Mapping[str, Any]] = None,
type_check: bool = True,
) -> None:
"""Compiles a pipeline written in a .py file.