{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# This benchmark measures the performance of pipeline related operations in Kubeflow Pipelines, including latencies of creating/getting/deleting pipelines.\n", "\n", "import random\n", "import kfp\n", "import kfp_server_api\n", "import os\n", "import string\n", "import time\n", "\n", "import numpy as np\n", "import pandas as pd\n", "import seaborn as sns\n", "import matplotlib.pyplot as plt\n", "from scipy import stats\n", "\n", "# CHANGE necessary paramters here\n", "# Host is your KFP endpoint\n", "host = 'http://127.0.0.1:3001'\n", "# Number of pipelines you want to create \n", "num_pipelines = 10\n", "# Number of pipeline versions you want to create under each pipeline\n", "num_pipeline_versions_per_pipeline = 10\n", "# Use the pipeline you prefer\n", "pipeline_file_url = 'https://storage.googleapis.com/jingzhangjz-project-pipelines/benchmarks/taxi.yaml'\n", "\n", "\n", "def random_suffix() -> string:\n", " return ''.join(random.choices(string.ascii_lowercase + string.digits, k=10))\n", "\n", "\n", "if __name__ == '__main__':\n", " client = kfp.Client(host)\n", " api_url = kfp_server_api.models.ApiUrl(pipeline_file_url)\n", " \n", " # Create pipeline latency\n", " create_latencies = []\n", " created_pipeline_ids = []\n", " for i in range(num_pipelines):\n", " api_pipeline = kfp_server_api.models.ApiPipeline(\n", " name='pipeline-' + random_suffix(),\n", " url=api_url)\n", " start = time.perf_counter()\n", " pipeline = client.pipelines.create_pipeline(body=api_pipeline)\n", " dur = time.perf_counter() - start\n", " create_latencies.append(dur)\n", " created_pipeline_ids.append(pipeline.id)\n", " \n", " # Create version latency \n", " create_version_latencies = []\n", " created_version_ids = []\n", " for pipeline_id in created_pipeline_ids:\n", " for j in range(num_pipeline_versions_per_pipeline):\n", " key = kfp_server_api.models.ApiResourceKey(id=pipeline_id, type=kfp_server_api.models.ApiResourceType.PIPELINE)\n", " reference = kfp_server_api.models.ApiResourceReference(key=key, relationship=kfp_server_api.models.ApiRelationship.OWNER)\n", " resource_references=[reference]\n", " api_pipeline_version = kfp_server_api.models.ApiPipelineVersion(\n", " name='pipeline-version-' + random_suffix(),\n", " package_url=api_url,\n", " resource_references=resource_references)\n", " start = time.perf_counter()\n", " pipeline_version = client.pipelines.create_pipeline_version(body=api_pipeline_version)\n", " dur = time.perf_counter() - start\n", " create_version_latencies.append(dur)\n", " created_version_ids.append(pipeline_version.id) \n", " # We sometimes observe errors when the version creation calls are too close to each other when those \n", " # versions are created in the same pipeline. When adding a new version to a specific pipeline, the \n", " # pipeline's default version is updated to the new version. Therefore, when we create a bunch of versions\n", " # for the same pipeline in a row within a short period of time, these creation operations are competing \n", " # for a write lock on the same row of pipelines table in our db. This is one of the possible hypotheses\n", " # to explain the errors when we've observed. But this is definitely an interesting symptom that worths \n", " # further investigation. For now, we separate the version creation calls by 2 seconds.\n", " time.sleep(2)\n", " \n", " # Get pipeline latency\n", " get_latencies = []\n", " for i in created_pipeline_ids:\n", " start = time.perf_counter()\n", " pipeline = client.pipelines.get_pipeline(i) \n", " dur = time.perf_counter() - start\n", " get_latencies.append(dur) \n", " \n", " # Delete pipeline latency\n", " delete_latencies= []\n", " for i in created_pipeline_ids:\n", " start = time.perf_counter()\n", " pipeline = client.pipelines.delete_pipeline(i) \n", " dur = time.perf_counter() - start\n", " delete_latencies.append(dur)\n", "\n", " # Plots\n", " fig, axs = plt.subplots(nrows=4, figsize=(10,20))\n", " \n", " axs[0].set(title='Create Pipeline Latency', xlabel='Time (Second)', ylabel='Create Pipeline')\n", " sns.distplot(a=create_latencies, ax=axs[0], hist=True, kde=False, rug=True)\n", " \n", " axs[1].set(title='Create Pipeline Version Latency', xlabel='Time (Second)', ylabel='Create Pipeline Version')\n", " sns.distplot(a=create_version_latencies, ax=axs[1], hist=True, kde=False, rug=True) \n", " \n", " axs[2].set(title='Get Pipeline Latency', xlabel='Time (Second)', ylabel='Get Pipeline')\n", " sns.distplot(a=get_latencies, ax=axs[2], hist=True, kde=False, rug=True) \n", " \n", " axs[3].set(title='Delete Pipeline Latency', xlabel='Time (Second)', ylabel='Delete Pipeline')\n", " sns.distplot(a=delete_latencies, ax=axs[3], hist=True, kde=False, rug=True)\n", " \n", " # TODO(jingzhang36): maybe dump the durations data to db or gcs, and let searborn read from there" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "file_extension": ".py", "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.6" }, "mimetype": "text/x-python", "name": "python", "npconvert_exporter": "python", "pygments_lexer": "ipython3", "version": 3 }, "nbformat": 4, "nbformat_minor": 2 }