Improve notebook check automation (#2040)

* Add logic to detect extension name.

* Rename notebook samples

* Change to use config yaml for papermill preprocess.

* Remove ad hoc logic

* Remove duplicated logic

* Refactor

* Add run_pipeline flag in config yaml

* Add run pipeline flag for .py sample as well.

* Fix extension name

* Fix

* Fix problems in docstring.

* refactor run_sample_test.py into two functions

* Refactor the procedure into 3 steps

* Fix bug in exit code format

* Remove two redundant functions.
This commit is contained in:
Jiaxiao Zheng 2019-09-09 13:40:55 -07:00 committed by Kubernetes Prow Robot
parent 0d81785466
commit 7b1c720a47
9 changed files with 221 additions and 126 deletions

View File

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import subprocess
import utils
import yaml
@ -20,23 +21,28 @@ from kfp import Client
class NoteBookChecker(object):
def __init__(self, testname, result, exit_code,
experiment=None, namespace='kubeflow'):
def __init__(self, testname, result, run_pipeline, namespace='kubeflow'):
""" Util class for checking notebook sample test running results.
:param testname: test name in the json xml.
:param result: name of the file that stores the test result
:param exit_code: the exit code of the notebook run. 0 for passed test.
:param experiment: where the test run belong, only necessary when a job is submitted.
:param run_pipeline: whether to submit for a pipeline run.
:param namespace: where the pipeline system is deployed.
"""
self._testname = testname
self._result = result
self._exit_code = exit_code
self._experiment = experiment
self._exit_code = None
self._run_pipeline = run_pipeline
self._namespace = namespace
def run(self):
""" Run the notebook sample as a python script. """
self._exit_code = str(
subprocess.call(['ipython', '%s.py' % self._testname]))
def check(self):
""" Check the pipeline running results of the notebook sample. """
test_cases = []
test_name = self._testname + ' Sample Test'
@ -56,13 +62,14 @@ class NoteBookChecker(object):
else:
test_timeout = raw_args['test_timeout']
if self._experiment is not None: # Bypassing dsl type check sample.
if self._run_pipeline:
experiment = self._testname + '-test'
###### Initialization ######
host = 'ml-pipeline.%s.svc.cluster.local:8888' % self._namespace
client = Client(host=host)
###### Get experiments ######
experiment_id = client.get_experiment(experiment_name=self._experiment).id
experiment_id = client.get_experiment(experiment_name=experiment).id
###### Get runs ######
list_runs_response = client.list_runs(page_size=RUN_LIST_PAGE_SIZE,

View File

@ -13,4 +13,5 @@
# limitations under the License.
test_name: default_sample_test
test_timeout: 1200
test_timeout: 1200
run_pipeline: True

View File

@ -0,0 +1,16 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
test_name: dsl_static_type_checking
run_pipeline: False

View File

@ -0,0 +1,17 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
test_name: lightweight_component
notebook_params:
EXPERIMENT_NAME: lightweight_component-test

View File

@ -15,4 +15,6 @@
# This is the schema for sample test config yaml file.
test_name: str()
arguments: map(required=False)
notebook_params: map(required=False)
test_timeout: int(min=0, required=False)
run_pipeline: bool(required=False)

View File

@ -37,34 +37,44 @@ class PySampleChecker(object):
self._output = output
self._result = result
self._namespace = namespace
self._run_pipeline = None
self._test_timeout = None
self._test_cases = []
self._test_name = self._testname + ' Sample Test'
self._client = None
self._experiment_id = None
self._job_name = None
self._test_args = None
self._run_id = None
def run(self):
"""Run compiled KFP pipeline."""
def check(self):
"""Run sample test and check results."""
test_cases = []
test_name = self._testname + ' Sample Test'
###### Initialization ######
host = 'ml-pipeline.%s.svc.cluster.local:8888' % self._namespace
client = Client(host=host)
self._client = Client(host=host)
###### Check Input File ######
utils.add_junit_test(test_cases, 'input generated yaml file',
utils.add_junit_test(self._test_cases, 'input generated yaml file',
os.path.exists(self._input), 'yaml file is not generated')
if not os.path.exists(self._input):
utils.write_junit_xml(test_name, self._result, test_cases)
utils.write_junit_xml(self._test_name, self._result, self._test_cases)
print('Error: job not found.')
exit(1)
###### Create Experiment ######
experiment_name = self._testname + ' sample experiment'
response = client.create_experiment(experiment_name)
experiment_id = response.id
utils.add_junit_test(test_cases, 'create experiment', True)
response = self._client.create_experiment(experiment_name)
self._experiment_id = response.id
utils.add_junit_test(self._test_cases, 'create experiment', True)
###### Create Job ######
job_name = self._testname + '_sample'
self._job_name = self._testname + '_sample'
###### Figure out arguments from associated config files. #######
test_args = {}
self._test_args = {}
config_schema = yamale.make_schema(SCHEMA_CONFIG)
try:
with open(DEFAULT_CONFIG, 'r') as f:
@ -76,70 +86,79 @@ class PySampleChecker(object):
except OSError as ose:
raise FileExistsError('Default config not found:{}'.format(ose))
else:
test_timeout = raw_args['test_timeout']
self._test_timeout = raw_args['test_timeout']
self._run_pipeline = raw_args['run_pipeline']
try:
with open(os.path.join(CONFIG_DIR, '%s.config.yaml' % self._testname), 'r') as f:
raw_args = yaml.safe_load(f)
test_config = yamale.make_data(os.path.join(
CONFIG_DIR, '%s.config.yaml' % self._testname))
CONFIG_DIR, '%s.config.yaml' % self._testname))
yamale.validate(config_schema, test_config) # If fails, a ValueError will be raised.
except yaml.YAMLError as yamlerr:
print('No legit yaml config file found, use default args:{}'.format(yamlerr))
except OSError as ose:
print('Config file with the same name not found, use default args:{}'.format(ose))
else:
test_args.update(raw_args['arguments'])
if 'output' in test_args.keys(): # output is a special param that has to be specified dynamically.
test_args['output'] = self._output
self._test_args.update(raw_args['arguments'])
if 'output' in self._test_args.keys(): # output is a special param that has to be specified dynamically.
self._test_args['output'] = self._output
if 'test_timeout' in raw_args.keys():
test_timeout = raw_args['test_timeout']
self._test_timeout = raw_args['test_timeout']
if 'run_pipeline' in raw_args.keys():
self._run_pipeline = raw_args['run_pipeline']
response = client.run_pipeline(experiment_id, job_name, self._input, test_args)
run_id = response.id
utils.add_junit_test(test_cases, 'create pipeline run', True)
# Submit for pipeline running.
if self._run_pipeline:
response = self._client.run_pipeline(self._experiment_id, self._job_name, self._input, self._test_args)
self._run_id = response.id
utils.add_junit_test(self._test_cases, 'create pipeline run', True)
###### Monitor Job ######
try:
start_time = datetime.now()
response = client.wait_for_run_completion(run_id, test_timeout)
succ = (response.run.status.lower() == 'succeeded')
end_time = datetime.now()
elapsed_time = (end_time - start_time).seconds
utils.add_junit_test(test_cases, 'job completion', succ,
'waiting for job completion failure', elapsed_time)
finally:
###### Output Argo Log for Debugging ######
workflow_json = client._get_workflow_json(run_id)
workflow_id = workflow_json['metadata']['name']
argo_log, _ = utils.run_bash_command('argo logs -n {} -w {}'.format(
self._namespace, workflow_id))
print('=========Argo Workflow Log=========')
print(argo_log)
if not succ:
utils.write_junit_xml(test_name, self._result, test_cases)
exit(1)
def check(self):
"""Check pipeline run results."""
if self._run_pipeline:
###### Monitor Job ######
try:
start_time = datetime.now()
response = self._client.wait_for_run_completion(self._run_id, self._test_timeout)
succ = (response.run.status.lower() == 'succeeded')
end_time = datetime.now()
elapsed_time = (end_time - start_time).seconds
utils.add_junit_test(self._test_cases, 'job completion', succ,
'waiting for job completion failure', elapsed_time)
finally:
###### Output Argo Log for Debugging ######
workflow_json = self._client._get_workflow_json(self._run_id)
workflow_id = workflow_json['metadata']['name']
argo_log, _ = utils.run_bash_command('argo logs -n {} -w {}'.format(
self._namespace, workflow_id))
print('=========Argo Workflow Log=========')
print(argo_log)
###### Validate the results for specific test cases ######
#TODO: Add result check for tfx-cab-classification after launch.
if self._testname == 'xgboost_training_cm':
# For xgboost sample, check its confusion matrix.
cm_tar_path = './confusion_matrix.tar.gz'
utils.get_artifact_in_minio(workflow_json, 'confusion-matrix', cm_tar_path,
'mlpipeline-ui-metadata')
with tarfile.open(cm_tar_path) as tar_handle:
file_handles = tar_handle.getmembers()
assert len(file_handles) == 1
if not succ:
utils.write_junit_xml(self._test_name, self._result, self._test_cases)
exit(1)
with tar_handle.extractfile(file_handles[0]) as f:
cm_data = f.read()
utils.add_junit_test(test_cases, 'confusion matrix format',
(len(cm_data) > 0),
'the confusion matrix file is empty')
###### Validate the results for specific test cases ######
#TODO: Add result check for tfx-cab-classification after launch.
if self._testname == 'xgboost_training_cm':
# For xgboost sample, check its confusion matrix.
cm_tar_path = './confusion_matrix.tar.gz'
utils.get_artifact_in_minio(workflow_json, 'confusion-matrix', cm_tar_path,
'mlpipeline-ui-metadata')
with tarfile.open(cm_tar_path) as tar_handle:
file_handles = tar_handle.getmembers()
assert len(file_handles) == 1
with tar_handle.extractfile(file_handles[0]) as f:
cm_data = f.read()
utils.add_junit_test(self._test_cases, 'confusion matrix format',
(len(cm_data) > 0),
'the confusion matrix file is empty')
###### Delete Job ######
#TODO: add deletion when the backend API offers the interface.
###### Write out the test result in junit xml ######
utils.write_junit_xml(test_name, self._result, test_cases)
utils.write_junit_xml(self._test_name, self._result, self._test_cases)

View File

@ -19,10 +19,13 @@ decides which test to trigger based upon the arguments provided.
import fire
import os
import papermill as pm
import re
import subprocess
import utils
import yamale
import yaml
from constants import PAPERMILL_ERR_MSG, BASE_DIR, TEST_DIR
from constants import PAPERMILL_ERR_MSG, BASE_DIR, TEST_DIR, SCHEMA_CONFIG, CONFIG_DIR, DEFAULT_CONFIG
from check_notebook_results import NoteBookChecker
from run_sample_test import PySampleChecker
@ -43,47 +46,14 @@ class SampleTest(object):
# Capture the first segment after gs:// as the project name.
self._bucket_name = results_gcs_dir.split('/')[2]
self._target_image_prefix = target_image_prefix
self._is_notebook = None
self._namespace = namespace
self._sample_test_result = 'junit_Sample%sOutput.xml' % self._test_name
self._sample_test_output = self._results_gcs_dir
self._work_dir = os.path.join(BASE_DIR, 'samples/core/', self._test_name)
def check_result(self):
os.chdir(TEST_DIR)
pysample_checker = PySampleChecker(testname=self._test_name,
input=os.path.join(self._work_dir, '%s.yaml' % self._test_name),
output=self._sample_test_output,
result=self._sample_test_result,
namespace=self._namespace)
pysample_checker.check()
print('Copy the test results to GCS %s/' % self._results_gcs_dir)
utils.upload_blob(
self._bucket_name,
self._sample_test_result,
os.path.join(self._results_gcs_dir, self._sample_test_result)
)
def check_notebook_result(self):
# Workaround because papermill does not directly return exit code.
exit_code = '1' if PAPERMILL_ERR_MSG in \
open('%s.ipynb' % self._test_name).read() else '0'
os.chdir(TEST_DIR)
if self._test_name == 'dsl_static_type_checking':
nbchecker = NoteBookChecker(testname=self._test_name,
result=self._sample_test_result,
exit_code=exit_code)
nbchecker.check()
else:
nbchecker = NoteBookChecker(testname=self._test_name,
result=self._sample_test_result,
exit_code=exit_code,
experiment=None,
namespace='kubeflow')
nbchecker.check()
def _copy_result(self):
""" Copy generated sample test result to gcs, so that Prow can pick it. """
print('Copy the test results to GCS %s/' % self._results_gcs_dir)
utils.upload_blob(
@ -92,37 +62,107 @@ class SampleTest(object):
os.path.join(self._results_gcs_dir, self._sample_test_result)
)
def _compile_sample(self):
def _compile(self):
os.chdir(self._work_dir)
print('Run the sample tests...')
# Looking for the entry point of the test.
list_of_files = os.listdir('.')
for file in list_of_files:
m = re.match(self._test_name + '\.[a-zA-Z]+', file)
if m:
file_name, ext_name = os.path.splitext(file)
if self._is_notebook is not None:
raise(RuntimeError('Multiple entry points found under sample: {}'.format(self._test_name)))
if ext_name == '.py':
self._is_notebook = False
if ext_name == '.ipynb':
self._is_notebook = True
if self._is_notebook is None:
raise(RuntimeError('No entry point found for sample: {}'.format(self._test_name)))
config_schema = yamale.make_schema(SCHEMA_CONFIG)
# Retrieve default config
try:
with open(DEFAULT_CONFIG, 'r') as f:
raw_args = yaml.safe_load(f)
default_config = yamale.make_data(DEFAULT_CONFIG)
yamale.validate(config_schema, default_config) # If fails, a ValueError will be raised.
except yaml.YAMLError as yamlerr:
raise RuntimeError('Illegal default config:{}'.format(yamlerr))
except OSError as ose:
raise FileExistsError('Default config not found:{}'.format(ose))
else:
self._run_pipeline = raw_args['run_pipeline']
# For presubmit check, do not do any image injection as for now.
# Notebook samples need to be papermilled first.
if self._test_name == 'lightweight_component':
if self._is_notebook:
# Parse necessary params from config.yaml
nb_params = {}
try:
with open(os.path.join(CONFIG_DIR, '%s.config.yaml' % self._test_name), 'r') as f:
raw_args = yaml.safe_load(f)
test_config = yamale.make_data(os.path.join(
CONFIG_DIR, '%s.config.yaml' % self._test_name))
yamale.validate(config_schema, test_config) # If fails, a ValueError will be raised.
except yaml.YAMLError as yamlerr:
print('No legit yaml config file found, use default args:{}'.format(yamlerr))
except OSError as ose:
print('Config file with the same name not found, use default args:{}'.format(ose))
else:
if 'notebook_params' in raw_args.keys():
nb_params.update(raw_args['notebook_params'])
if 'run_pipeline' in raw_args.keys():
self._run_pipeline = raw_args['run_pipeline']
pm.execute_notebook(
input_path='Lightweight Python components - basics.ipynb',
input_path='%s.ipynb' % self._test_name,
output_path='%s.ipynb' % self._test_name,
parameters=dict(
EXPERIMENT_NAME='%s-test' % self._test_name
)
)
elif self._test_name == 'dsl_static_type_checking':
pm.execute_notebook(
input_path='DSL Static Type Checking.ipynb',
output_path='%s.ipynb' % self._test_name,
parameters={}
parameters=nb_params,
prepare_only=True
)
# Convert to python script.
subprocess.call([
'jupyter', 'nbconvert', '--to', 'python', '%s.ipynb' % self._test_name
])
else:
subprocess.call(['dsl-compile', '--py', '%s.py' % self._test_name,
'--output', '%s.yaml' % self._test_name])
def _injection(self):
"""Inject images for pipeline components.
This is only valid for coimponent test
"""
pass
def run_test(self):
self._compile_sample()
if self._test_name in ['lightweight_component', 'dsl_static_type_checking']:
self.check_notebook_result()
self._compile()
self._injection()
if self._is_notebook:
nbchecker = NoteBookChecker(testname=self._test_name,
result=self._sample_test_result,
run_pipeline=self._run_pipeline)
nbchecker.run()
os.chdir(TEST_DIR)
nbchecker.check()
else:
self.check_result()
os.chdir(TEST_DIR)
pysample_checker = PySampleChecker(testname=self._test_name,
input=os.path.join(
self._work_dir,
'%s.yaml' % self._test_name),
output=self._sample_test_output,
result=self._sample_test_result,
namespace=self._namespace)
pysample_checker.run()
pysample_checker.check()
self._copy_result()
class ComponentTest(SampleTest):
@ -205,13 +245,6 @@ class ComponentTest(SampleTest):
subs)
def run_test(self):
# compile, injection, check_result
self._compile_sample()
self._injection()
self.check_result()
def main():
"""Launches either KFP sample test or component test as a command entrypoint.