pipelines/samples/core/caching/caching.ipynb

140 lines
4.1 KiB
Plaintext

{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"kfp_endpoint = None\n",
"\n",
"import datetime\n",
"import time\n",
"\n",
"import kfp as kfp\n",
"from kfp import dsl\n",
"\n",
"\n",
"@dsl.component\n",
"def do_work_op(seconds: float = 60) -> str:\n",
" import datetime\n",
" import time\n",
" print(f\"Working for {seconds} seconds.\")\n",
" for i in range(int(seconds)):\n",
" print(f\"Working: {i}.\")\n",
" time.sleep(1)\n",
" print(\"Done.\")\n",
" return datetime.datetime.now().isoformat()\n",
"\n",
"\n",
"@kfp.dsl.pipeline(name='caching-pipeline')\n",
"def caching_pipeline(seconds: float = 60):\n",
" # All outputs of successful executions are cached\n",
" work_task = do_work_op(seconds=seconds)\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Test 1\n",
"# Running the pipeline for the first time.\n",
"# The pipeline performs work and the results are cached.\n",
"# The pipeline run time should be ~60 seconds.\n",
"print(\"Starting test 1\")\n",
"start_time = datetime.datetime.now()\n",
"kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(\n",
" caching_pipeline,\n",
" arguments=dict(seconds=60),\n",
").wait_for_run_completion(timeout=999)\n",
"elapsed_time = datetime.datetime.now() - start_time\n",
"print(f\"Total run time: {int(elapsed_time.total_seconds())} seconds\")\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Test 2\n",
"# Running the pipeline the second time.\n",
"# The pipeline should reuse the cached results and complete faster.\n",
"# The pipeline run time should be <60 seconds.\n",
"print(\"Starting test 2\")\n",
"start_time = datetime.datetime.now()\n",
"kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(\n",
" caching_pipeline,\n",
" arguments=dict(seconds=60),\n",
").wait_for_run_completion(timeout=999)\n",
"elapsed_time = datetime.datetime.now() - start_time\n",
"print(f\"Total run time: {int(elapsed_time.total_seconds())} seconds\")\n",
"\n",
"if elapsed_time.total_seconds() > 60:\n",
" raise RuntimeError(\"The cached execution was not re-used or pipeline run took to long to complete.\")\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"\n",
"# Test 3\n",
"# In this case, the pipeline should not reuse the cached result, since they are disabled.\n",
"@kfp.dsl.pipeline(name='caching-pipeline3')\n",
"def caching_pipeline3(seconds: float = 60):\n",
" work_task = do_work_op(seconds=seconds)\n",
" work_task.set_caching_options(enable_caching=False)\n",
"\n",
"print(\"Starting test 3\")\n",
"start_time = datetime.datetime.now()\n",
"kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(\n",
" caching_pipeline3,\n",
" arguments=dict(seconds=60),\n",
").wait_for_run_completion(timeout=999)\n",
"elapsed_time = datetime.datetime.now() - start_time\n",
"print(f\"Total run time: {int(elapsed_time.total_seconds())} seconds\")\n",
"\n",
"if elapsed_time.total_seconds() < 60:\n",
" raise RuntimeError(\"The cached execution was apparently re-used, but that should not happen.\")\n"
]
}
],
"metadata": {
"celltoolbar": "Tags",
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.7"
},
"pycharm": {
"stem_cell": {
"cell_type": "raw",
"metadata": {
"collapsed": false
},
"source": []
}
}
},
"nbformat": 4,
"nbformat_minor": 2
}