{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# This benchmark measures the performance of run related operations in Kubeflow pipelines, including run durations and latencies of creating/getting/deleting runs.\n", "\n", "import random\n", "import kfp\n", "import kfp_server_api\n", "import os\n", "import string\n", "import time\n", "from google.cloud import storage\n", "from kfp.components import create_component_from_func\n", "from datetime import datetime, timezone, timedelta\n", "\n", "import numpy as np\n", "import pandas as pd\n", "import seaborn as sns\n", "import matplotlib.pyplot as plt\n", "from scipy import stats\n", "\n", "# CHANGE necessary paramters here\n", "# host is your KFP endpoint\n", "host = 'http://127.0.0.1:3001'\n", "# Use the pipeline you prefer\n", "pipeline_file_url = 'https://storage.googleapis.com/jingzhangjz-project-pipelines/benchmarks/taxi.yaml'\n", "# number of runs you want to create\n", "num_runs = 5\n", "# Periodically check whether the runs have been finished.\n", "run_status_polling_interval_sec = 60\n", "\n", "\n", "def random_suffix() -> string:\n", " return ''.join(random.choices(string.ascii_lowercase + string.digits, k=10))\n", "\n", "def run_finished(run_status: string) -> bool:\n", " return run_status in {'Succeeded', 'Failed', 'Error', 'Skipped', 'Terminated'}\n", "\n", "def run_succeeded(run_status: string) -> bool:\n", " return run_status in {'Succeeded'}\n", "\n", "\n", "if __name__ == '__main__':\n", " client = kfp.Client(host)\n", " \n", " # Create a pipeline and we'll use its default version to create runs.\n", " api_url = kfp_server_api.models.ApiUrl(pipeline_file_url)\n", " api_pipeline = kfp_server_api.models.ApiPipeline(\n", " name='pipeline-' + random_suffix(),\n", " url=api_url)\n", " pipeline = client.pipelines.create_pipeline(body=api_pipeline)\n", " default_version_id = pipeline.default_version.id\n", "\n", " # Create an experiment.\n", " experiment_name = 'experiment-' + random_suffix()\n", " experiment = client.experiments.create_experiment(body={'name' : experiment_name})\n", " experiment_id = experiment.id\n", " \n", " # Measure create run latency. Note this time is the roundrip latency of CreateRun method. The actual run is \n", " # not finished when client side gets the CreateRun response. Run duration will be measured below when run is \n", " # actually finished.\n", " created_runs = []\n", " create_run_latencies = []\n", " for i in range(num_runs):\n", " resource_references = []\n", " key = kfp_server_api.models.ApiResourceKey(id=experiment_id, type=kfp_server_api.models.ApiResourceType.EXPERIMENT)\n", " reference = kfp_server_api.models.ApiResourceReference(key=key, relationship=kfp_server_api.models.ApiRelationship.OWNER)\n", " resource_references.append(reference)\n", " key = kfp_server_api.models.ApiResourceKey(id=default_version_id, type=kfp_server_api.models.ApiResourceType.PIPELINE_VERSION)\n", " reference = kfp_server_api.models.ApiResourceReference(key=key, relationship=kfp_server_api.models.ApiRelationship.CREATOR)\n", " resource_references.append(reference)\n", " # If the pipeline you choose needs to specify parameters to create a run, specify it here.\n", " parameters = []\n", " parameter = kfp_server_api.ApiParameter(name='pipeline-root', value='gs://jingzhangjz-project-outputs/tfx_taxi_simple/{{workflow.uid}}')\n", " parameters.append(parameter)\n", " parameter = kfp_server_api.ApiParameter(name='data-root', value='gs://ml-pipeline-playground/tfx_taxi_simple/data')\n", " parameters.append(parameter)\n", " parameter = kfp_server_api.ApiParameter(name='module-file', value='gs://ml-pipeline-playground/tfx_taxi_simple/modules/taxi_utils.py')\n", " parameters.append(parameter) \n", " pipeline_spec = kfp_server_api.ApiPipelineSpec(parameters=parameters)\n", "\n", " start = time.perf_counter()\n", " run_name = 'run-' + random_suffix()\n", " run = client.runs.create_run(body={'name':run_name, 'resource_references': resource_references, 'pipeline_spec': pipeline_spec}) \n", " dur = time.perf_counter() - start\n", " create_run_latencies.append(dur) \n", " created_runs.append(run.run.id)\n", " \n", " # Wait for the runs to finish. \n", " # TODO(jingzhang36): We can add a timeout for this polling. For now we rely on the timeout of runs in KFP. \n", " while True:\n", " all_finished = True\n", " for i in created_runs:\n", " run = client.runs.get_run(i) \n", " if not run_finished(run.run.status):\n", " all_finished = False\n", " break\n", " if all_finished: \n", " break\n", " else:\n", " time.sleep(run_status_polling_interval_sec)\n", "\n", " # When all runs are finished, measure run durations and get run latencies.\n", " get_run_latencies = []\n", " succeeded_run_durations = []\n", " run_results = []\n", " for i in created_runs:\n", " start = time.perf_counter()\n", " run = client.runs.get_run(i) \n", " dur = time.perf_counter() - start\n", " get_run_latencies.append(dur) \n", " if run_succeeded(run.run.status):\n", " run_results.append('succeeded')\n", " succeeded_run_durations.append((run.run.finished_at - run.run.created_at).total_seconds())\n", " else:\n", " run_results.append('not_succeeded')\n", "\n", " # Measure delete run latency.\n", " delete_run_latencies = []\n", " for i in created_runs:\n", " start = time.perf_counter()\n", " run = client.runs.delete_run(i) \n", " dur = time.perf_counter() - start\n", " delete_run_latencies.append(dur) \n", " \n", " # Cleanup\n", " client.pipelines.delete_pipeline(pipeline.id)\n", " client.experiments.delete_experiment(experiment.id)\n", " \n", " # Plots\n", " fig, axs = plt.subplots(nrows=4, figsize=(10,20))\n", " \n", " axs[0].set(title='Create Run Latency', xlabel='Time (Second)', ylabel='Create')\n", " sns.distplot(a=create_run_latencies, ax=axs[0], hist=True, kde=False, rug=True)\n", " \n", " axs[1].set(title='Run Durations', xlabel='Time (Second)', ylabel='Run')\n", " sns.distplot(a=succeeded_run_durations, ax=axs[1], hist=True, kde=False, rug=True) \n", " \n", " axs[2].set(title='Get Run Latency', xlabel='Time (Second)', ylabel='Get')\n", " sns.distplot(a=get_run_latencies, ax=axs[2], hist=True, kde=False, rug=True) \n", " \n", " axs[3].set(title='Delete Run Latency', xlabel='Time (Second)', ylabel='Delete')\n", " sns.distplot(a=delete_run_latencies, ax=axs[3], hist=True, kde=False, rug=True)\n", "\n", " loaded_run_results = pd.DataFrame(np.array(run_results), columns=['result'])\n", " sns.catplot(x='result', kind=\"count\", data=loaded_run_results)\n", " " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "file_extension": ".py", "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.6" }, "mimetype": "text/x-python", "name": "python", "npconvert_exporter": "python", "pygments_lexer": "ipython3", "version": 3 }, "nbformat": 4, "nbformat_minor": 2 }