mirror of https://github.com/kubeflow/examples.git
				
				
				
			Fix the mnist_gcp_test.py (#741)
* Fix the mnist_gcp_test.py
* The job spec was invalid; we were missing container name
* There were a bunch of other issues as well.
* Pull in the changes from xgboost_synthetic to upload an HTML version
  of the notebook output to GCS.
* Add exceptoin
* Revert "Add exceptoin"
This reverts commit 44f34d9d74.
			
			
This commit is contained in:
		
							parent
							
								
									5b4b0c6c94
								
							
						
					
					
						commit
						b218d2b23c
					
				| 
						 | 
				
			
			@ -56,6 +56,7 @@ workflows:
 | 
			
		|||
    include_dirs:
 | 
			
		||||
      - xgboost_synthetic/*
 | 
			
		||||
      - mnist/*
 | 
			
		||||
      - py/kubeflow/examples/notebook_tests
 | 
			
		||||
      - py/kubeflow/examples/create_e2e_workflow.py
 | 
			
		||||
 | 
			
		||||
  # E2E test for various notebooks
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,4 +1,4 @@
 | 
			
		|||
import fire
 | 
			
		||||
import argparse
 | 
			
		||||
import tempfile
 | 
			
		||||
import logging
 | 
			
		||||
import os
 | 
			
		||||
| 
						 | 
				
			
			@ -6,10 +6,13 @@ import subprocess
 | 
			
		|||
 | 
			
		||||
logger = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
from google.cloud import storage
 | 
			
		||||
from kubeflow.testing import util
 | 
			
		||||
 | 
			
		||||
def prepare_env():
 | 
			
		||||
  subprocess.check_call(["pip3", "install", "-U", "papermill"])
 | 
			
		||||
  subprocess.check_call(["pip3", "install", "-r", "../requirements.txt"])
 | 
			
		||||
 | 
			
		||||
  subprocess.check_call(["pip3", "install", "-U", "nbconvert"])
 | 
			
		||||
  subprocess.check_call(["pip3", "install", "-U", "nbformat"])
 | 
			
		||||
 | 
			
		||||
def execute_notebook(notebook_path, parameters=None):
 | 
			
		||||
  import papermill #pylint: disable=import-error
 | 
			
		||||
| 
						 | 
				
			
			@ -21,13 +24,32 @@ def execute_notebook(notebook_path, parameters=None):
 | 
			
		|||
                             log_output=True)
 | 
			
		||||
  return notebook_output_path
 | 
			
		||||
 | 
			
		||||
def run_notebook_test(notebook_path, expected_messages, parameters=None):
 | 
			
		||||
def _upload_notebook_html(content, target):
 | 
			
		||||
  gcs_client = storage.Client()
 | 
			
		||||
  bucket_name, path = util.split_gcs_uri(target)
 | 
			
		||||
 | 
			
		||||
  bucket = gcs_client.get_bucket(bucket_name)
 | 
			
		||||
 | 
			
		||||
  logging.info("Uploading notebook to %s.", target)
 | 
			
		||||
  blob = bucket.blob(path)
 | 
			
		||||
  # Need to set content type so that if we browse in GCS we end up rendering
 | 
			
		||||
  # as html.
 | 
			
		||||
  blob.upload_from_string(content, content_type="text/html")
 | 
			
		||||
 | 
			
		||||
def run_notebook_test(notebook_path, parameters=None):
 | 
			
		||||
  import nbformat #pylint: disable=import-error
 | 
			
		||||
  import nbconvert #pylint: disable=import-error
 | 
			
		||||
 | 
			
		||||
  output_path = execute_notebook(notebook_path, parameters=parameters)
 | 
			
		||||
  actual_output = open(output_path, 'r').read()
 | 
			
		||||
  for expected_message in expected_messages:
 | 
			
		||||
    if not expected_message in actual_output:
 | 
			
		||||
      logger.error(actual_output)
 | 
			
		||||
      assert False, "Unable to find from output: " + expected_message
 | 
			
		||||
 | 
			
		||||
  with open(output_path, "r") as hf:
 | 
			
		||||
    actual_output = hf.read()
 | 
			
		||||
 | 
			
		||||
  nb = nbformat.reads(actual_output, as_version=4)
 | 
			
		||||
  html_exporter = nbconvert.HTMLExporter()
 | 
			
		||||
  (html_output, _) = html_exporter.from_notebook_node(nb)
 | 
			
		||||
  gcs_path = os.getenv("OUTPUT_GCS")
 | 
			
		||||
  _upload_notebook_html(html_output, gcs_path)
 | 
			
		||||
 | 
			
		||||
class NotebookExecutor:
 | 
			
		||||
  @staticmethod
 | 
			
		||||
| 
						 | 
				
			
			@ -40,13 +62,7 @@ class NotebookExecutor:
 | 
			
		|||
    prepare_env()
 | 
			
		||||
    FILE_DIR = os.path.dirname(__file__)
 | 
			
		||||
 | 
			
		||||
    EXPECTED_MGS = [
 | 
			
		||||
        "Finished upload of",
 | 
			
		||||
        "Model export success: mockup-model.dat",
 | 
			
		||||
        "Pod started running True",
 | 
			
		||||
        "Cluster endpoint: http:",
 | 
			
		||||
    ]
 | 
			
		||||
    run_notebook_test(notebook_path, EXPECTED_MGS)
 | 
			
		||||
    run_notebook_test(notebook_path)
 | 
			
		||||
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
  logging.basicConfig(level=logging.INFO,
 | 
			
		||||
| 
						 | 
				
			
			@ -55,4 +71,13 @@ if __name__ == "__main__":
 | 
			
		|||
                      datefmt='%Y-%m-%dT%H:%M:%S',
 | 
			
		||||
                      )
 | 
			
		||||
 | 
			
		||||
  fire.Fire(NotebookExecutor)
 | 
			
		||||
  # fire isn't available in the notebook image which is why we aren't
 | 
			
		||||
  # using it.
 | 
			
		||||
  parser = argparse.ArgumentParser()
 | 
			
		||||
  parser.add_argument(
 | 
			
		||||
    "--notebook_path", default="", type=str, help=("Path to the notebook"))
 | 
			
		||||
 | 
			
		||||
  args = parser.parse_args()
 | 
			
		||||
 | 
			
		||||
  NotebookExecutor.test(args.notebook_path)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,4 +1,7 @@
 | 
			
		|||
# A batch job to run a notebook using papermill.
 | 
			
		||||
# The YAML is modified by nb_test_util.py to generate a Job specific
 | 
			
		||||
# to a notebook.
 | 
			
		||||
#
 | 
			
		||||
# TODO(jlewi): We should switch to using Tekton
 | 
			
		||||
apiVersion: batch/v1
 | 
			
		||||
kind: Job
 | 
			
		||||
| 
						 | 
				
			
			@ -36,8 +39,9 @@ spec:
 | 
			
		|||
      - env:
 | 
			
		||||
        - name: PYTHONPATH
 | 
			
		||||
          value: /src/kubeflow/examples/py/
 | 
			
		||||
      - name: executing-notebooks
 | 
			
		||||
        name: executing-notebooks
 | 
			
		||||
        image: execute-image
 | 
			
		||||
        # Command will get overwritten by nb_test_util.py
 | 
			
		||||
        command: ["python3", "-m",
 | 
			
		||||
                  "kubeflow.examples.notebook_tests.execute_notebook",
 | 
			
		||||
                  "test", "/src/kubeflow/examples/mnist/mnist_gcp.ipynb"]
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,22 +1,26 @@
 | 
			
		|||
import datetime
 | 
			
		||||
import logging
 | 
			
		||||
import os
 | 
			
		||||
import uuid
 | 
			
		||||
 | 
			
		||||
import pytest
 | 
			
		||||
 | 
			
		||||
from kubeflow.examples.notebook_tests import nb_test_util
 | 
			
		||||
from kubeflow.testing import util
 | 
			
		||||
 | 
			
		||||
# TODO(jlewi): This test is new; there's some work to be done to make it
 | 
			
		||||
# reliable. So for now we mark it as expected to fail in presubmits
 | 
			
		||||
# We only mark it as expected to fail
 | 
			
		||||
# on presubmits because if expected failures don't show up in test grid
 | 
			
		||||
# and we want signal in postsubmits and periodics
 | 
			
		||||
@pytest.mark.xfail(os.getenv("JOB_TYPE") == "presubmit", reason="Flaky")
 | 
			
		||||
def test_mnist_gcp(record_xml_attribute, name, namespace, # pylint: disable=too-many-branches,too-many-statements
 | 
			
		||||
                   repos, image):
 | 
			
		||||
  '''Generate Job and summit.'''
 | 
			
		||||
  util.set_pytest_junit(record_xml_attribute, "test_mnist")
 | 
			
		||||
 | 
			
		||||
  if not name:
 | 
			
		||||
    name = "mnist-" + datetime.datetime.now().strftime("%H%M%S") + "-"
 | 
			
		||||
    name = name + uuid.uuid4().hex[0:3]
 | 
			
		||||
 | 
			
		||||
  util.set_pytest_junit(record_xml_attribute, "test_mnist_gcp")
 | 
			
		||||
  nb_test_util.run_papermill_job(name, namespace, repos, image)
 | 
			
		||||
 | 
			
		||||
  notebook_path = "kubeflow/examples/mnist/mnist_gcp.ipynb"
 | 
			
		||||
  nb_test_util.run_papermill_job(notebook_path, name, namespace, repos, image)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -2,23 +2,36 @@
 | 
			
		|||
 | 
			
		||||
import datetime
 | 
			
		||||
import logging
 | 
			
		||||
import os
 | 
			
		||||
import uuid
 | 
			
		||||
import tempfile
 | 
			
		||||
import yaml
 | 
			
		||||
 | 
			
		||||
from google.cloud import storage
 | 
			
		||||
from kubernetes import client as k8s_client
 | 
			
		||||
from kubeflow.testing import argo_build_util
 | 
			
		||||
from kubeflow.testing import prow_artifacts
 | 
			
		||||
from kubeflow.testing import util
 | 
			
		||||
 | 
			
		||||
def run_papermill_job(name, namespace, # pylint: disable=too-many-branches,too-many-statements
 | 
			
		||||
# This is the bucket where the batch jobs will uploaded an HTML version of the
 | 
			
		||||
# notebook will be written to. The K8s job is running in a Kubeflow cluster
 | 
			
		||||
# so it needs to be a bucket that the kubeflow cluster can write to.
 | 
			
		||||
# This is why we don't write directly to the bucket used for prow artifacts
 | 
			
		||||
NB_BUCKET = "kubeflow-ci-deployment"
 | 
			
		||||
PROJECT = "kbueflow-ci-deployment"
 | 
			
		||||
 | 
			
		||||
def run_papermill_job(notebook_path, name, namespace, # pylint: disable=too-many-branches,too-many-statements
 | 
			
		||||
                      repos, image):
 | 
			
		||||
  """Generate a K8s job to run a notebook using papermill
 | 
			
		||||
 | 
			
		||||
  Args:
 | 
			
		||||
    notebook_path: Path to the notebook. This should be in the form
 | 
			
		||||
      "{REPO_OWNER}/{REPO}/path/to/notebook.ipynb"
 | 
			
		||||
    name: Name for the K8s job
 | 
			
		||||
    namespace: The namespace where the job should run.
 | 
			
		||||
    repos: (Optional) Which repos to checkout; if not specified tries
 | 
			
		||||
    repos: Which repos to checkout; if None or empty tries
 | 
			
		||||
      to infer based on PROW environment variables
 | 
			
		||||
    image:
 | 
			
		||||
    image: The docker image to run the notebook in.
 | 
			
		||||
  """
 | 
			
		||||
 | 
			
		||||
  util.maybe_activate_service_account()
 | 
			
		||||
| 
						 | 
				
			
			@ -26,6 +39,10 @@ def run_papermill_job(name, namespace, # pylint: disable=too-many-branches,too-m
 | 
			
		|||
  with open("job.yaml") as hf:
 | 
			
		||||
    job = yaml.load(hf)
 | 
			
		||||
 | 
			
		||||
  if notebook_path.startswith("/"):
 | 
			
		||||
    raise ValueError("notebook_path={0} should not start with /".format(
 | 
			
		||||
        notebook_path))
 | 
			
		||||
 | 
			
		||||
  # We need to checkout the correct version of the code
 | 
			
		||||
  # in presubmits and postsubmits. We should check the environment variables
 | 
			
		||||
  # for the prow environment variables to get the appropriate values.
 | 
			
		||||
| 
						 | 
				
			
			@ -35,6 +52,12 @@ def run_papermill_job(name, namespace, # pylint: disable=too-many-branches,too-m
 | 
			
		|||
  if not repos:
 | 
			
		||||
    repos = argo_build_util.get_repo_from_prow_env()
 | 
			
		||||
 | 
			
		||||
  if not repos:
 | 
			
		||||
    raise ValueError("Could not get repos from prow environment variable "
 | 
			
		||||
                     "and --repos isn't explicitly set")
 | 
			
		||||
 | 
			
		||||
  repos += ",kubeflow/testing@HEAD"
 | 
			
		||||
 | 
			
		||||
  logging.info("Repos set to %s", repos)
 | 
			
		||||
  job["spec"]["template"]["spec"]["initContainers"][0]["command"] = [
 | 
			
		||||
    "/usr/local/bin/checkout_repos.sh",
 | 
			
		||||
| 
						 | 
				
			
			@ -42,16 +65,55 @@ def run_papermill_job(name, namespace, # pylint: disable=too-many-branches,too-m
 | 
			
		|||
    "--src_dir=/src",
 | 
			
		||||
    "--depth=all",
 | 
			
		||||
  ]
 | 
			
		||||
 | 
			
		||||
  job["spec"]["template"]["spec"]["containers"][0]["image"] = image
 | 
			
		||||
 | 
			
		||||
  full_notebook_path = os.path.join("/src", notebook_path)
 | 
			
		||||
  job["spec"]["template"]["spec"]["containers"][0]["command"] = [
 | 
			
		||||
    "python3", "-m",
 | 
			
		||||
    "kubeflow.examples.notebook_tests.execute_notebook",
 | 
			
		||||
    "--notebook_path", full_notebook_path]
 | 
			
		||||
 | 
			
		||||
  job["spec"]["template"]["spec"]["containers"][0][
 | 
			
		||||
      "workingDir"] = os.path.dirname(full_notebook_path)
 | 
			
		||||
 | 
			
		||||
  # The prow bucket to use for results/artifacts
 | 
			
		||||
  prow_bucket = prow_artifacts.PROW_RESULTS_BUCKET
 | 
			
		||||
 | 
			
		||||
  if os.getenv("REPO_OWNER") and os.getenv("REPO_NAME"):
 | 
			
		||||
    # Running under prow
 | 
			
		||||
    prow_dir = prow_artifacts.get_gcs_dir(prow_bucket)
 | 
			
		||||
    prow_dir = os.path.join(prow_dir, "artifacts")
 | 
			
		||||
 | 
			
		||||
    if os.getenv("TEST_TARGET_NAME"):
 | 
			
		||||
      prow_dir = os.path.join(
 | 
			
		||||
        prow_dir, os.getenv("TEST_TARGET_NAME").lstrip("/"))
 | 
			
		||||
    prow_bucket, prow_path = util.split_gcs_uri(prow_dir)
 | 
			
		||||
 | 
			
		||||
  else:
 | 
			
		||||
    prow_path = "notebook-test" + datetime.datetime.now().strftime("%H%M%S")
 | 
			
		||||
    prow_path = prow_path + "-" + uuid.uuid4().hex[0:3]
 | 
			
		||||
    prow_dir = util.to_gcs_uri(prow_bucket, prow_path)
 | 
			
		||||
 | 
			
		||||
  prow_path = os.path.join(prow_path, name + ".html")
 | 
			
		||||
  output_gcs = util.to_gcs_uri(NB_BUCKET, prow_path)
 | 
			
		||||
 | 
			
		||||
  job["spec"]["template"]["spec"]["containers"][0]["env"] = [
 | 
			
		||||
    {"name": "OUTPUT_GCS", "value": output_gcs},
 | 
			
		||||
    {"name": "PYTHONPATH",
 | 
			
		||||
     "value": "/src/kubeflow/testing/py:/src/kubeflow/examples/py"},
 | 
			
		||||
  ]
 | 
			
		||||
 | 
			
		||||
  logging.info("Notebook will be written to %s", output_gcs)
 | 
			
		||||
  util.load_kube_config(persist_config=False)
 | 
			
		||||
 | 
			
		||||
  if name:
 | 
			
		||||
    job["metadata"]["name"] = name
 | 
			
		||||
  else:
 | 
			
		||||
    job["metadata"]["name"] = ("xgboost-test-" +
 | 
			
		||||
    job["metadata"]["name"] = ("notebook-test-" +
 | 
			
		||||
                               datetime.datetime.now().strftime("%H%M%S")
 | 
			
		||||
                               + "-" + uuid.uuid4().hex[0:3])
 | 
			
		||||
    name = job["metadata"]["name"]
 | 
			
		||||
  name = job["metadata"]["name"]
 | 
			
		||||
 | 
			
		||||
  job["metadata"]["namespace"] = namespace
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -70,6 +132,16 @@ def run_papermill_job(name, namespace, # pylint: disable=too-many-branches,too-m
 | 
			
		|||
 | 
			
		||||
  logging.info("Final job:\n%s", yaml.safe_dump(final_job.to_dict()))
 | 
			
		||||
 | 
			
		||||
  # Download notebook html to artifacts
 | 
			
		||||
  logging.info("Copying %s to bucket %s", output_gcs, prow_bucket)
 | 
			
		||||
 | 
			
		||||
  storage_client = storage.Client()
 | 
			
		||||
  bucket = storage_client.get_bucket(NB_BUCKET)
 | 
			
		||||
  blob = bucket.get_blob(prow_path)
 | 
			
		||||
 | 
			
		||||
  destination_bucket = storage_client.get_bucket(prow_bucket)
 | 
			
		||||
  bucket.copy_blob(blob, destination_bucket)
 | 
			
		||||
 | 
			
		||||
  if not final_job.status.conditions:
 | 
			
		||||
    raise RuntimeError("Job {0}.{1}; did not complete".format(namespace, name))
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -78,3 +150,4 @@ def run_papermill_job(name, namespace, # pylint: disable=too-many-branches,too-m
 | 
			
		|||
  if last_condition.type not in ["Complete"]:
 | 
			
		||||
    logging.error("Job didn't complete successfully")
 | 
			
		||||
    raise RuntimeError("Job {0}.{1} failed".format(namespace, name))
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -13,7 +13,6 @@ def prepare_env():
 | 
			
		|||
  subprocess.check_call(["pip3", "install", "-U", "nbformat"])
 | 
			
		||||
  subprocess.check_call(["pip3", "install", "-r", "../requirements.txt"])
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def execute_notebook(notebook_path, parameters=None):
 | 
			
		||||
  temp_dir = tempfile.mkdtemp()
 | 
			
		||||
  notebook_output_path = os.path.join(temp_dir, "out.ipynb")
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue