Add two scripts to load test our api endpoints with measurement of run durations and api call latencies (#3587)
* script to profile pipeline api endpoint * two plots * another run api test * clear cell output * add some comments * pipeline uses create pipeline * add client * checkpoint * polish two scripts * remove accidentally committed files * added a success vs non-success plot; only measure run durations for succeeded runs
This commit is contained in:
parent
45d4cf7a09
commit
09a8689ef6
|
@ -0,0 +1,158 @@
|
|||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# This benchmark measures the performance of pipeline related operations in Kubeflow Pipelines, including latencies of creating/getting/deleting pipelines.\n",
|
||||
"\n",
|
||||
"import random\n",
|
||||
"import kfp\n",
|
||||
"import kfp_server_api\n",
|
||||
"import os\n",
|
||||
"import string\n",
|
||||
"import time\n",
|
||||
"\n",
|
||||
"import numpy as np\n",
|
||||
"import pandas as pd\n",
|
||||
"import seaborn as sns\n",
|
||||
"import matplotlib.pyplot as plt\n",
|
||||
"from scipy import stats\n",
|
||||
"\n",
|
||||
"# CHANGE necessary paramters here\n",
|
||||
"# Host is your KFP endpoint\n",
|
||||
"host = 'http://127.0.0.1:3001'\n",
|
||||
"# Number of pipelines you want to create \n",
|
||||
"num_pipelines = 10\n",
|
||||
"# Number of pipeline versions you want to create under each pipeline\n",
|
||||
"num_pipeline_versions_per_pipeline = 10\n",
|
||||
"# Use the pipeline you prefer\n",
|
||||
"pipeline_file_url = 'https://storage.googleapis.com/jingzhangjz-project-pipelines/benchmarks/taxi.yaml'\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"def random_suffix() -> string:\n",
|
||||
" return ''.join(random.choices(string.ascii_lowercase + string.digits, k=10))\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"if __name__ == '__main__':\n",
|
||||
" client = kfp.Client(host)\n",
|
||||
" api_url = kfp_server_api.models.ApiUrl(pipeline_file_url)\n",
|
||||
" \n",
|
||||
" # Create pipeline latency\n",
|
||||
" create_latencies = []\n",
|
||||
" created_pipeline_ids = []\n",
|
||||
" for i in range(num_pipelines):\n",
|
||||
" api_pipeline = kfp_server_api.models.ApiPipeline(\n",
|
||||
" name='pipeline-' + random_suffix(),\n",
|
||||
" url=api_url)\n",
|
||||
" start = time.perf_counter()\n",
|
||||
" pipeline = client.pipelines.create_pipeline(body=api_pipeline)\n",
|
||||
" dur = time.perf_counter() - start\n",
|
||||
" create_latencies.append(dur)\n",
|
||||
" created_pipeline_ids.append(pipeline.id)\n",
|
||||
" \n",
|
||||
" # Create version latency \n",
|
||||
" create_version_latencies = []\n",
|
||||
" created_version_ids = []\n",
|
||||
" for pipeline_id in created_pipeline_ids:\n",
|
||||
" for j in range(num_pipeline_versions_per_pipeline):\n",
|
||||
" key = kfp_server_api.models.ApiResourceKey(id=pipeline_id, type=kfp_server_api.models.ApiResourceType.PIPELINE)\n",
|
||||
" reference = kfp_server_api.models.ApiResourceReference(key=key, relationship=kfp_server_api.models.ApiRelationship.OWNER)\n",
|
||||
" resource_references=[reference]\n",
|
||||
" api_pipeline_version = kfp_server_api.models.ApiPipelineVersion(\n",
|
||||
" name='pipeline-version-' + random_suffix(),\n",
|
||||
" package_url=api_url,\n",
|
||||
" resource_references=resource_references)\n",
|
||||
" start = time.perf_counter()\n",
|
||||
" pipeline_version = client.pipelines.create_pipeline_version(body=api_pipeline_version)\n",
|
||||
" dur = time.perf_counter() - start\n",
|
||||
" create_version_latencies.append(dur)\n",
|
||||
" created_version_ids.append(pipeline_version.id) \n",
|
||||
" # We sometimes observe errors when the version creation calls are too close to each other when those \n",
|
||||
" # versions are created in the same pipeline. When adding a new version to a specific pipeline, the \n",
|
||||
" # pipeline's default version is updated to the new version. Therefore, when we create a bunch of versions\n",
|
||||
" # for the same pipeline in a row within a short period of time, these creation operations are competing \n",
|
||||
" # for a write lock on the same row of pipelines table in our db. This is one of the possible hypotheses\n",
|
||||
" # to explain the errors when we've observed. But this is definitely an interesting symptom that worths \n",
|
||||
" # further investigation. For now, we separate the version creation calls by 2 seconds.\n",
|
||||
" time.sleep(2)\n",
|
||||
" \n",
|
||||
" # Get pipeline latency\n",
|
||||
" get_latencies = []\n",
|
||||
" for i in created_pipeline_ids:\n",
|
||||
" start = time.perf_counter()\n",
|
||||
" pipeline = client.pipelines.get_pipeline(i) \n",
|
||||
" dur = time.perf_counter() - start\n",
|
||||
" get_latencies.append(dur) \n",
|
||||
" \n",
|
||||
" # Delete pipeline latency\n",
|
||||
" delete_latencies= []\n",
|
||||
" for i in created_pipeline_ids:\n",
|
||||
" start = time.perf_counter()\n",
|
||||
" pipeline = client.pipelines.delete_pipeline(i) \n",
|
||||
" dur = time.perf_counter() - start\n",
|
||||
" delete_latencies.append(dur)\n",
|
||||
"\n",
|
||||
" # Plots\n",
|
||||
" fig, axs = plt.subplots(nrows=4, figsize=(10,20))\n",
|
||||
" \n",
|
||||
" label_create_latencies = pd.Series(create_latencies, name='Create Pipeline Latency (Second)')\n",
|
||||
" sns.distplot(a=label_create_latencies, ax=axs[0])\n",
|
||||
" \n",
|
||||
" label_create_version_latencies = pd.Series(create_version_latencies, name='Create Pipeline Version Latency (Second)')\n",
|
||||
" sns.distplot(a=label_create_version_latencies, ax=axs[1])\n",
|
||||
" \n",
|
||||
" label_get_latencies = pd.Series(get_latencies, name='Get Pipeline Latency (Second)')\n",
|
||||
" sns.distplot(a=label_get_latencies, ax=axs[2])\n",
|
||||
" \n",
|
||||
" label_delete_latencies = pd.Series(delete_latencies, name='Delete Pipeline Latency (Second)')\n",
|
||||
" sns.distplot(a=label_delete_latencies, ax=axs[3])\n",
|
||||
" \n",
|
||||
" # TODO(jingzhang36): maybe dump the durations data to db or gcs, and let searborn read from there"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"file_extension": ".py",
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.7.6"
|
||||
},
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"npconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": 3
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 2
|
||||
}
|
|
@ -0,0 +1,197 @@
|
|||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# This benchmark measures the performance of run related operations in Kubeflow pipelines, including run durations and latencies of creating/getting/deleting runs.\n",
|
||||
"\n",
|
||||
"import random\n",
|
||||
"import kfp\n",
|
||||
"import kfp_server_api\n",
|
||||
"import os\n",
|
||||
"import string\n",
|
||||
"import time\n",
|
||||
"from google.cloud import storage\n",
|
||||
"from kfp.components import create_component_from_func\n",
|
||||
"from datetime import datetime, timezone, timedelta\n",
|
||||
"\n",
|
||||
"import numpy as np\n",
|
||||
"import pandas as pd\n",
|
||||
"import seaborn as sns\n",
|
||||
"import matplotlib.pyplot as plt\n",
|
||||
"from scipy import stats\n",
|
||||
"\n",
|
||||
"# CHANGE necessary paramters here\n",
|
||||
"# host is your KFP endpoint\n",
|
||||
"host = 'http://127.0.0.1:3001'\n",
|
||||
"# Use the pipeline you prefer\n",
|
||||
"pipeline_file_url = 'https://storage.googleapis.com/jingzhangjz-project-pipelines/benchmarks/taxi.yaml'\n",
|
||||
"# number of runs you want to create\n",
|
||||
"num_runs = 5\n",
|
||||
"# Periodically check whether the runs have been finished.\n",
|
||||
"run_status_polling_interval_sec = 60\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"def random_suffix() -> string:\n",
|
||||
" return ''.join(random.choices(string.ascii_lowercase + string.digits, k=10))\n",
|
||||
"\n",
|
||||
"def run_finished(run_status: string) -> bool:\n",
|
||||
" return run_status in {'Succeeded', 'Failed', 'Error', 'Skipped', 'Terminated'}\n",
|
||||
"\n",
|
||||
"def run_succeeded(run_status: string) -> bool:\n",
|
||||
" return run_status in {'Succeeded'}\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"if __name__ == '__main__':\n",
|
||||
" client = kfp.Client(host)\n",
|
||||
" \n",
|
||||
" # Create a pipeline and we'll use its default version to create runs.\n",
|
||||
" api_url = kfp_server_api.models.ApiUrl(pipeline_file_url)\n",
|
||||
" api_pipeline = kfp_server_api.models.ApiPipeline(\n",
|
||||
" name='pipeline-' + random_suffix(),\n",
|
||||
" url=api_url)\n",
|
||||
" pipeline = client.pipelines.create_pipeline(body=api_pipeline)\n",
|
||||
" default_version_id = pipeline.default_version.id\n",
|
||||
"\n",
|
||||
" # Create an experiment.\n",
|
||||
" experiment_name = 'experiment-' + random_suffix()\n",
|
||||
" experiment = client.experiments.create_experiment(body={'name' : experiment_name})\n",
|
||||
" experiment_id = experiment.id\n",
|
||||
" \n",
|
||||
" # Measure create run latency. Note this time is the roundrip latency of CreateRun method. The actual run is \n",
|
||||
" # not finished when client side gets the CreateRun response. Run duration will be measured below when run is \n",
|
||||
" # actually finished.\n",
|
||||
" created_runs = []\n",
|
||||
" create_run_latencies = []\n",
|
||||
" for i in range(num_runs):\n",
|
||||
" resource_references = []\n",
|
||||
" key = kfp_server_api.models.ApiResourceKey(id=experiment_id, type=kfp_server_api.models.ApiResourceType.EXPERIMENT)\n",
|
||||
" reference = kfp_server_api.models.ApiResourceReference(key=key, relationship=kfp_server_api.models.ApiRelationship.OWNER)\n",
|
||||
" resource_references.append(reference)\n",
|
||||
" key = kfp_server_api.models.ApiResourceKey(id=default_version_id, type=kfp_server_api.models.ApiResourceType.PIPELINE_VERSION)\n",
|
||||
" reference = kfp_server_api.models.ApiResourceReference(key=key, relationship=kfp_server_api.models.ApiRelationship.CREATOR)\n",
|
||||
" resource_references.append(reference)\n",
|
||||
" # If the pipeline you choose needs to specify parameters to create a run, specify it here.\n",
|
||||
" parameters = []\n",
|
||||
" parameter = kfp_server_api.ApiParameter(name='pipeline-root', value='gs://jingzhangjz-project-outputs/tfx_taxi_simple/{{workflow.uid}}')\n",
|
||||
" parameters.append(parameter)\n",
|
||||
" parameter = kfp_server_api.ApiParameter(name='data-root', value='gs://ml-pipeline-playground/tfx_taxi_simple/data')\n",
|
||||
" parameters.append(parameter)\n",
|
||||
" parameter = kfp_server_api.ApiParameter(name='module-file', value='gs://ml-pipeline-playground/tfx_taxi_simple/modules/taxi_utils.py')\n",
|
||||
" parameters.append(parameter) \n",
|
||||
" pipeline_spec = kfp_server_api.ApiPipelineSpec(parameters=parameters)\n",
|
||||
"\n",
|
||||
" start = time.perf_counter()\n",
|
||||
" run_name = 'run-' + random_suffix()\n",
|
||||
" run = client.runs.create_run(body={'name':run_name, 'resource_references': resource_references, 'pipeline_spec': pipeline_spec}) \n",
|
||||
" dur = time.perf_counter() - start\n",
|
||||
" create_run_latencies.append(dur) \n",
|
||||
" created_runs.append(run.run.id)\n",
|
||||
" \n",
|
||||
" # Wait for the runs to finish. \n",
|
||||
" # TODO(jingzhang36): We can add a timeout for this polling. For now we rely on the timeout of runs in KFP. \n",
|
||||
" while True:\n",
|
||||
" all_finished = True\n",
|
||||
" for i in created_runs:\n",
|
||||
" run = client.runs.get_run(i) \n",
|
||||
" if not run_finished(run.run.status):\n",
|
||||
" all_finished = False\n",
|
||||
" break\n",
|
||||
" if all_finished: \n",
|
||||
" break\n",
|
||||
" else:\n",
|
||||
" time.sleep(run_status_polling_interval_sec)\n",
|
||||
"\n",
|
||||
" # When all runs are finished, measure run durations and get run latencies.\n",
|
||||
" get_run_latencies = []\n",
|
||||
" succeeded_run_durations = []\n",
|
||||
" run_results = []\n",
|
||||
" for i in created_runs:\n",
|
||||
" start = time.perf_counter()\n",
|
||||
" run = client.runs.get_run(i) \n",
|
||||
" dur = time.perf_counter() - start\n",
|
||||
" get_run_latencies.append(dur) \n",
|
||||
" if run_succeeded(run.run.status):\n",
|
||||
" run_results.append('succeeded')\n",
|
||||
" succeeded_run_durations.append((run.run.finished_at - run.run.created_at).total_seconds())\n",
|
||||
" else:\n",
|
||||
" run_results.append('not_succeeded')\n",
|
||||
"\n",
|
||||
" # Measure delete run latency.\n",
|
||||
" delete_run_latencies = []\n",
|
||||
" for i in created_runs:\n",
|
||||
" start = time.perf_counter()\n",
|
||||
" run = client.runs.delete_run(i) \n",
|
||||
" dur = time.perf_counter() - start\n",
|
||||
" delete_run_latencies.append(dur) \n",
|
||||
" \n",
|
||||
" # Cleanup\n",
|
||||
" client.pipelines.delete_pipeline(pipeline.id)\n",
|
||||
" client.experiments.delete_experiment(experiment.id)\n",
|
||||
" \n",
|
||||
" # Plots\n",
|
||||
" fig, axs = plt.subplots(nrows=4, figsize=(10,20))\n",
|
||||
" \n",
|
||||
" label_create_run_latencies = pd.Series(create_run_latencies, name='Create Run Latency (Second)')\n",
|
||||
" sns.distplot(a=label_create_run_latencies, ax=axs[0])\n",
|
||||
" \n",
|
||||
" label_run_durations = pd.Series(succeeded_run_durations, name='Run Durations (Second)')\n",
|
||||
" sns.distplot(a=label_run_durations, ax=axs[1]) \n",
|
||||
"\n",
|
||||
" label_get_run_latencies = pd.Series(get_run_latencies, name='Get Run Latency (Second)')\n",
|
||||
" sns.distplot(a=label_get_run_latencies, ax=axs[2]) \n",
|
||||
" \n",
|
||||
" label_delete_run_latencies = pd.Series(delete_run_latencies, name='Delete Run Latency (Second)')\n",
|
||||
" sns.distplot(a=label_delete_run_latencies, ax=axs[3])\n",
|
||||
" \n",
|
||||
" loaded_run_results = pd.DataFrame(np.array(run_results), columns=['result'])\n",
|
||||
" sns.catplot(x='result', kind=\"count\", data=loaded_run_results)\n",
|
||||
" "
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"file_extension": ".py",
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.7.6"
|
||||
},
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"npconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": 3
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 2
|
||||
}
|
Loading…
Reference in New Issue