diff --git a/samples/core/dsl_static_type_checking/DSL Static Type Checking.ipynb b/samples/core/dsl_static_type_checking/dsl_static_type_checking.ipynb similarity index 100% rename from samples/core/dsl_static_type_checking/DSL Static Type Checking.ipynb rename to samples/core/dsl_static_type_checking/dsl_static_type_checking.ipynb diff --git a/samples/core/lightweight_component/Lightweight Python components - basics.ipynb b/samples/core/lightweight_component/lightweight_component.ipynb similarity index 100% rename from samples/core/lightweight_component/Lightweight Python components - basics.ipynb rename to samples/core/lightweight_component/lightweight_component.ipynb diff --git a/test/sample-test/check_notebook_results.py b/test/sample-test/check_notebook_results.py index ca69af8cf3..9042097105 100644 --- a/test/sample-test/check_notebook_results.py +++ b/test/sample-test/check_notebook_results.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import subprocess import utils import yaml @@ -20,23 +21,28 @@ from kfp import Client class NoteBookChecker(object): - def __init__(self, testname, result, exit_code, - experiment=None, namespace='kubeflow'): + def __init__(self, testname, result, run_pipeline, namespace='kubeflow'): """ Util class for checking notebook sample test running results. :param testname: test name in the json xml. :param result: name of the file that stores the test result - :param exit_code: the exit code of the notebook run. 0 for passed test. - :param experiment: where the test run belong, only necessary when a job is submitted. + :param run_pipeline: whether to submit for a pipeline run. :param namespace: where the pipeline system is deployed. """ self._testname = testname self._result = result - self._exit_code = exit_code - self._experiment = experiment + self._exit_code = None + self._run_pipeline = run_pipeline self._namespace = namespace + def run(self): + """ Run the notebook sample as a python script. """ + self._exit_code = str( + subprocess.call(['ipython', '%s.py' % self._testname])) + + def check(self): + """ Check the pipeline running results of the notebook sample. """ test_cases = [] test_name = self._testname + ' Sample Test' @@ -56,13 +62,14 @@ class NoteBookChecker(object): else: test_timeout = raw_args['test_timeout'] - if self._experiment is not None: # Bypassing dsl type check sample. + if self._run_pipeline: + experiment = self._testname + '-test' ###### Initialization ###### host = 'ml-pipeline.%s.svc.cluster.local:8888' % self._namespace client = Client(host=host) ###### Get experiments ###### - experiment_id = client.get_experiment(experiment_name=self._experiment).id + experiment_id = client.get_experiment(experiment_name=experiment).id ###### Get runs ###### list_runs_response = client.list_runs(page_size=RUN_LIST_PAGE_SIZE, diff --git a/test/sample-test/configs/default.config.yaml b/test/sample-test/configs/default.config.yaml index 2b2b6a807b..24acd64c00 100644 --- a/test/sample-test/configs/default.config.yaml +++ b/test/sample-test/configs/default.config.yaml @@ -13,4 +13,5 @@ # limitations under the License. test_name: default_sample_test -test_timeout: 1200 \ No newline at end of file +test_timeout: 1200 +run_pipeline: True \ No newline at end of file diff --git a/test/sample-test/configs/dsl_static_type_checking.config.yaml b/test/sample-test/configs/dsl_static_type_checking.config.yaml new file mode 100644 index 0000000000..0b43ffbca4 --- /dev/null +++ b/test/sample-test/configs/dsl_static_type_checking.config.yaml @@ -0,0 +1,16 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +test_name: dsl_static_type_checking +run_pipeline: False \ No newline at end of file diff --git a/test/sample-test/configs/lightweight_component.config.yaml b/test/sample-test/configs/lightweight_component.config.yaml new file mode 100644 index 0000000000..2391157524 --- /dev/null +++ b/test/sample-test/configs/lightweight_component.config.yaml @@ -0,0 +1,17 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +test_name: lightweight_component +notebook_params: + EXPERIMENT_NAME: lightweight_component-test diff --git a/test/sample-test/configs/schema.config.yaml b/test/sample-test/configs/schema.config.yaml index 25e578f39c..7d6b2bc85e 100644 --- a/test/sample-test/configs/schema.config.yaml +++ b/test/sample-test/configs/schema.config.yaml @@ -15,4 +15,6 @@ # This is the schema for sample test config yaml file. test_name: str() arguments: map(required=False) +notebook_params: map(required=False) test_timeout: int(min=0, required=False) +run_pipeline: bool(required=False) \ No newline at end of file diff --git a/test/sample-test/run_sample_test.py b/test/sample-test/run_sample_test.py index 2cb99afc9f..ee9d6f568d 100644 --- a/test/sample-test/run_sample_test.py +++ b/test/sample-test/run_sample_test.py @@ -37,34 +37,44 @@ class PySampleChecker(object): self._output = output self._result = result self._namespace = namespace + self._run_pipeline = None + self._test_timeout = None + + self._test_cases = [] + self._test_name = self._testname + ' Sample Test' + + self._client = None + self._experiment_id = None + self._job_name = None + self._test_args = None + self._run_id = None + + def run(self): + """Run compiled KFP pipeline.""" - def check(self): - """Run sample test and check results.""" - test_cases = [] - test_name = self._testname + ' Sample Test' ###### Initialization ###### host = 'ml-pipeline.%s.svc.cluster.local:8888' % self._namespace - client = Client(host=host) + self._client = Client(host=host) ###### Check Input File ###### - utils.add_junit_test(test_cases, 'input generated yaml file', + utils.add_junit_test(self._test_cases, 'input generated yaml file', os.path.exists(self._input), 'yaml file is not generated') if not os.path.exists(self._input): - utils.write_junit_xml(test_name, self._result, test_cases) + utils.write_junit_xml(self._test_name, self._result, self._test_cases) print('Error: job not found.') exit(1) ###### Create Experiment ###### experiment_name = self._testname + ' sample experiment' - response = client.create_experiment(experiment_name) - experiment_id = response.id - utils.add_junit_test(test_cases, 'create experiment', True) + response = self._client.create_experiment(experiment_name) + self._experiment_id = response.id + utils.add_junit_test(self._test_cases, 'create experiment', True) ###### Create Job ###### - job_name = self._testname + '_sample' + self._job_name = self._testname + '_sample' ###### Figure out arguments from associated config files. ####### - test_args = {} + self._test_args = {} config_schema = yamale.make_schema(SCHEMA_CONFIG) try: with open(DEFAULT_CONFIG, 'r') as f: @@ -76,70 +86,79 @@ class PySampleChecker(object): except OSError as ose: raise FileExistsError('Default config not found:{}'.format(ose)) else: - test_timeout = raw_args['test_timeout'] + self._test_timeout = raw_args['test_timeout'] + self._run_pipeline = raw_args['run_pipeline'] try: with open(os.path.join(CONFIG_DIR, '%s.config.yaml' % self._testname), 'r') as f: raw_args = yaml.safe_load(f) test_config = yamale.make_data(os.path.join( - CONFIG_DIR, '%s.config.yaml' % self._testname)) + CONFIG_DIR, '%s.config.yaml' % self._testname)) yamale.validate(config_schema, test_config) # If fails, a ValueError will be raised. except yaml.YAMLError as yamlerr: print('No legit yaml config file found, use default args:{}'.format(yamlerr)) except OSError as ose: print('Config file with the same name not found, use default args:{}'.format(ose)) else: - test_args.update(raw_args['arguments']) - if 'output' in test_args.keys(): # output is a special param that has to be specified dynamically. - test_args['output'] = self._output + self._test_args.update(raw_args['arguments']) + if 'output' in self._test_args.keys(): # output is a special param that has to be specified dynamically. + self._test_args['output'] = self._output if 'test_timeout' in raw_args.keys(): - test_timeout = raw_args['test_timeout'] + self._test_timeout = raw_args['test_timeout'] + if 'run_pipeline' in raw_args.keys(): + self._run_pipeline = raw_args['run_pipeline'] - response = client.run_pipeline(experiment_id, job_name, self._input, test_args) - run_id = response.id - utils.add_junit_test(test_cases, 'create pipeline run', True) + # Submit for pipeline running. + if self._run_pipeline: + response = self._client.run_pipeline(self._experiment_id, self._job_name, self._input, self._test_args) + self._run_id = response.id + utils.add_junit_test(self._test_cases, 'create pipeline run', True) - ###### Monitor Job ###### - try: - start_time = datetime.now() - response = client.wait_for_run_completion(run_id, test_timeout) - succ = (response.run.status.lower() == 'succeeded') - end_time = datetime.now() - elapsed_time = (end_time - start_time).seconds - utils.add_junit_test(test_cases, 'job completion', succ, - 'waiting for job completion failure', elapsed_time) - finally: - ###### Output Argo Log for Debugging ###### - workflow_json = client._get_workflow_json(run_id) - workflow_id = workflow_json['metadata']['name'] - argo_log, _ = utils.run_bash_command('argo logs -n {} -w {}'.format( - self._namespace, workflow_id)) - print('=========Argo Workflow Log=========') - print(argo_log) - if not succ: - utils.write_junit_xml(test_name, self._result, test_cases) - exit(1) + def check(self): + """Check pipeline run results.""" + if self._run_pipeline: + ###### Monitor Job ###### + try: + start_time = datetime.now() + response = self._client.wait_for_run_completion(self._run_id, self._test_timeout) + succ = (response.run.status.lower() == 'succeeded') + end_time = datetime.now() + elapsed_time = (end_time - start_time).seconds + utils.add_junit_test(self._test_cases, 'job completion', succ, + 'waiting for job completion failure', elapsed_time) + finally: + ###### Output Argo Log for Debugging ###### + workflow_json = self._client._get_workflow_json(self._run_id) + workflow_id = workflow_json['metadata']['name'] + argo_log, _ = utils.run_bash_command('argo logs -n {} -w {}'.format( + self._namespace, workflow_id)) + print('=========Argo Workflow Log=========') + print(argo_log) - ###### Validate the results for specific test cases ###### - #TODO: Add result check for tfx-cab-classification after launch. - if self._testname == 'xgboost_training_cm': - # For xgboost sample, check its confusion matrix. - cm_tar_path = './confusion_matrix.tar.gz' - utils.get_artifact_in_minio(workflow_json, 'confusion-matrix', cm_tar_path, - 'mlpipeline-ui-metadata') - with tarfile.open(cm_tar_path) as tar_handle: - file_handles = tar_handle.getmembers() - assert len(file_handles) == 1 + if not succ: + utils.write_junit_xml(self._test_name, self._result, self._test_cases) + exit(1) - with tar_handle.extractfile(file_handles[0]) as f: - cm_data = f.read() - utils.add_junit_test(test_cases, 'confusion matrix format', - (len(cm_data) > 0), - 'the confusion matrix file is empty') + ###### Validate the results for specific test cases ###### + #TODO: Add result check for tfx-cab-classification after launch. + if self._testname == 'xgboost_training_cm': + # For xgboost sample, check its confusion matrix. + cm_tar_path = './confusion_matrix.tar.gz' + utils.get_artifact_in_minio(workflow_json, 'confusion-matrix', cm_tar_path, + 'mlpipeline-ui-metadata') + with tarfile.open(cm_tar_path) as tar_handle: + file_handles = tar_handle.getmembers() + assert len(file_handles) == 1 + + with tar_handle.extractfile(file_handles[0]) as f: + cm_data = f.read() + utils.add_junit_test(self._test_cases, 'confusion matrix format', + (len(cm_data) > 0), + 'the confusion matrix file is empty') ###### Delete Job ###### #TODO: add deletion when the backend API offers the interface. ###### Write out the test result in junit xml ###### - utils.write_junit_xml(test_name, self._result, test_cases) + utils.write_junit_xml(self._test_name, self._result, self._test_cases) diff --git a/test/sample-test/sample_test_launcher.py b/test/sample-test/sample_test_launcher.py index 1d83973793..196c75d217 100644 --- a/test/sample-test/sample_test_launcher.py +++ b/test/sample-test/sample_test_launcher.py @@ -19,10 +19,13 @@ decides which test to trigger based upon the arguments provided. import fire import os import papermill as pm +import re import subprocess import utils +import yamale +import yaml -from constants import PAPERMILL_ERR_MSG, BASE_DIR, TEST_DIR +from constants import PAPERMILL_ERR_MSG, BASE_DIR, TEST_DIR, SCHEMA_CONFIG, CONFIG_DIR, DEFAULT_CONFIG from check_notebook_results import NoteBookChecker from run_sample_test import PySampleChecker @@ -43,47 +46,14 @@ class SampleTest(object): # Capture the first segment after gs:// as the project name. self._bucket_name = results_gcs_dir.split('/')[2] self._target_image_prefix = target_image_prefix + self._is_notebook = None self._namespace = namespace self._sample_test_result = 'junit_Sample%sOutput.xml' % self._test_name self._sample_test_output = self._results_gcs_dir self._work_dir = os.path.join(BASE_DIR, 'samples/core/', self._test_name) - def check_result(self): - os.chdir(TEST_DIR) - pysample_checker = PySampleChecker(testname=self._test_name, - input=os.path.join(self._work_dir, '%s.yaml' % self._test_name), - output=self._sample_test_output, - result=self._sample_test_result, - namespace=self._namespace) - pysample_checker.check() - - print('Copy the test results to GCS %s/' % self._results_gcs_dir) - utils.upload_blob( - self._bucket_name, - self._sample_test_result, - os.path.join(self._results_gcs_dir, self._sample_test_result) - ) - - def check_notebook_result(self): - # Workaround because papermill does not directly return exit code. - exit_code = '1' if PAPERMILL_ERR_MSG in \ - open('%s.ipynb' % self._test_name).read() else '0' - - os.chdir(TEST_DIR) - - if self._test_name == 'dsl_static_type_checking': - nbchecker = NoteBookChecker(testname=self._test_name, - result=self._sample_test_result, - exit_code=exit_code) - nbchecker.check() - else: - nbchecker = NoteBookChecker(testname=self._test_name, - result=self._sample_test_result, - exit_code=exit_code, - experiment=None, - namespace='kubeflow') - nbchecker.check() - + def _copy_result(self): + """ Copy generated sample test result to gcs, so that Prow can pick it. """ print('Copy the test results to GCS %s/' % self._results_gcs_dir) utils.upload_blob( @@ -92,37 +62,107 @@ class SampleTest(object): os.path.join(self._results_gcs_dir, self._sample_test_result) ) - def _compile_sample(self): + def _compile(self): os.chdir(self._work_dir) print('Run the sample tests...') + # Looking for the entry point of the test. + list_of_files = os.listdir('.') + for file in list_of_files: + m = re.match(self._test_name + '\.[a-zA-Z]+', file) + if m: + file_name, ext_name = os.path.splitext(file) + if self._is_notebook is not None: + raise(RuntimeError('Multiple entry points found under sample: {}'.format(self._test_name))) + if ext_name == '.py': + self._is_notebook = False + if ext_name == '.ipynb': + self._is_notebook = True + + if self._is_notebook is None: + raise(RuntimeError('No entry point found for sample: {}'.format(self._test_name))) + + config_schema = yamale.make_schema(SCHEMA_CONFIG) + # Retrieve default config + try: + with open(DEFAULT_CONFIG, 'r') as f: + raw_args = yaml.safe_load(f) + default_config = yamale.make_data(DEFAULT_CONFIG) + yamale.validate(config_schema, default_config) # If fails, a ValueError will be raised. + except yaml.YAMLError as yamlerr: + raise RuntimeError('Illegal default config:{}'.format(yamlerr)) + except OSError as ose: + raise FileExistsError('Default config not found:{}'.format(ose)) + else: + self._run_pipeline = raw_args['run_pipeline'] + # For presubmit check, do not do any image injection as for now. # Notebook samples need to be papermilled first. - if self._test_name == 'lightweight_component': + if self._is_notebook: + # Parse necessary params from config.yaml + nb_params = {} + + try: + with open(os.path.join(CONFIG_DIR, '%s.config.yaml' % self._test_name), 'r') as f: + raw_args = yaml.safe_load(f) + test_config = yamale.make_data(os.path.join( + CONFIG_DIR, '%s.config.yaml' % self._test_name)) + yamale.validate(config_schema, test_config) # If fails, a ValueError will be raised. + except yaml.YAMLError as yamlerr: + print('No legit yaml config file found, use default args:{}'.format(yamlerr)) + except OSError as ose: + print('Config file with the same name not found, use default args:{}'.format(ose)) + else: + if 'notebook_params' in raw_args.keys(): + nb_params.update(raw_args['notebook_params']) + if 'run_pipeline' in raw_args.keys(): + self._run_pipeline = raw_args['run_pipeline'] + pm.execute_notebook( - input_path='Lightweight Python components - basics.ipynb', + input_path='%s.ipynb' % self._test_name, output_path='%s.ipynb' % self._test_name, - parameters=dict( - EXPERIMENT_NAME='%s-test' % self._test_name - ) - ) - elif self._test_name == 'dsl_static_type_checking': - pm.execute_notebook( - input_path='DSL Static Type Checking.ipynb', - output_path='%s.ipynb' % self._test_name, - parameters={} + parameters=nb_params, + prepare_only=True ) + # Convert to python script. + subprocess.call([ + 'jupyter', 'nbconvert', '--to', 'python', '%s.ipynb' % self._test_name + ]) + else: subprocess.call(['dsl-compile', '--py', '%s.py' % self._test_name, '--output', '%s.yaml' % self._test_name]) + def _injection(self): + """Inject images for pipeline components. + This is only valid for coimponent test + """ + pass + def run_test(self): - self._compile_sample() - if self._test_name in ['lightweight_component', 'dsl_static_type_checking']: - self.check_notebook_result() + self._compile() + self._injection() + if self._is_notebook: + nbchecker = NoteBookChecker(testname=self._test_name, + result=self._sample_test_result, + run_pipeline=self._run_pipeline) + nbchecker.run() + os.chdir(TEST_DIR) + nbchecker.check() else: - self.check_result() + os.chdir(TEST_DIR) + pysample_checker = PySampleChecker(testname=self._test_name, + input=os.path.join( + self._work_dir, + '%s.yaml' % self._test_name), + output=self._sample_test_output, + result=self._sample_test_result, + namespace=self._namespace) + pysample_checker.run() + pysample_checker.check() + + self._copy_result() class ComponentTest(SampleTest): @@ -205,13 +245,6 @@ class ComponentTest(SampleTest): subs) - def run_test(self): - # compile, injection, check_result - self._compile_sample() - self._injection() - self.check_result() - - def main(): """Launches either KFP sample test or component test as a command entrypoint.