pipelines/samples/core/caching/caching.ipynb

145 lines
4.8 KiB
Plaintext

{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"kfp_endpoint = None\n",
"\n",
"import datetime\n",
"import time\n",
"\n",
"import kfp.deprecated as kfp\n",
"from kfp.deprecated.components import create_component_from_func\n",
"\n",
"\n",
"@create_component_from_func\n",
"def do_work_op(seconds: float = 60) -> str:\n",
" import datetime\n",
" import time\n",
" print(f\"Working for {seconds} seconds.\")\n",
" for i in range(int(seconds)):\n",
" print(f\"Working: {i}.\")\n",
" time.sleep(1)\n",
" print(\"Done.\")\n",
" return datetime.datetime.now().isoformat()\n",
"\n",
"\n",
"def caching_pipeline(seconds: float = 60):\n",
" # All outputs of successful executions are cached\n",
" work_task = do_work_op(seconds)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Test 1\n",
"# Running the pipeline for the first time.\n",
"# The pipeline performs work and the results are cached.\n",
"# The pipeline run time should be ~60 seconds.\n",
"print(\"Starting test 1\")\n",
"start_time = datetime.datetime.now()\n",
"kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(\n",
" caching_pipeline,\n",
" arguments=dict(seconds=60),\n",
").wait_for_run_completion(timeout=999)\n",
"elapsed_time = datetime.datetime.now() - start_time\n",
"print(f\"Total run time: {int(elapsed_time.total_seconds())} seconds\")\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Test 2\n",
"# Running the pipeline the second time.\n",
"# The pipeline should reuse the cached results and complete faster.\n",
"# The pipeline run time should be <60 seconds.\n",
"print(\"Starting test 2\")\n",
"start_time = datetime.datetime.now()\n",
"kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(\n",
" caching_pipeline,\n",
" arguments=dict(seconds=60),\n",
").wait_for_run_completion(timeout=999)\n",
"elapsed_time = datetime.datetime.now() - start_time\n",
"print(f\"Total run time: {int(elapsed_time.total_seconds())} seconds\")\n",
"\n",
"if elapsed_time.total_seconds() > 60:\n",
" raise RuntimeError(\"The cached execution was not re-used or pipeline run took to long to complete.\")\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Test 3\n",
"# For each task we can specify the maximum cached data staleness.\n",
"# For example: task.execution_options.caching_strategy.max_cache_staleness = \"P7D\" # (7 days)\n",
"# The `max_cache_staleness` attribute uses the [RFC3339 duration format](https://tools.ietf.org/html/rfc3339#appendix-A). For example: \"P0D\" (0 days), \"PT5H\" (5 hours; notice the \"T\")\n",
"# Cached results that are older than the specified time span, are not reused.\n",
"# In this case, the pipeline should not reuse the cached result, since they will be stale.\n",
"\n",
"def caching_pipeline3(seconds: float = 60):\n",
" # All outputs of successful executions are cached\n",
" work_task = do_work_op(seconds)\n",
" # TODO(Ark-kun): Fix handling non-zero periods in the backend\n",
" work_task.execution_options.caching_strategy.max_cache_staleness = 'P0D' # = Period: Time: 0 seconds\n",
"\n",
"# Waiting for some time for the cached data to become stale:\n",
"time.sleep(10)\n",
"print(\"Starting test 3\")\n",
"start_time = datetime.datetime.now()\n",
"kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(\n",
" caching_pipeline3,\n",
" arguments=dict(seconds=60),\n",
").wait_for_run_completion(timeout=999)\n",
"elapsed_time = datetime.datetime.now() - start_time\n",
"print(f\"Total run time: {int(elapsed_time.total_seconds())} seconds\")\n",
"\n",
"if elapsed_time.total_seconds() < 60:\n",
" raise RuntimeError(\"The cached execution was apparently re-used, but that should not happen.\")\n"
]
}
],
"metadata": {
"celltoolbar": "Tags",
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.7"
},
"pycharm": {
"stem_cell": {
"cell_type": "raw",
"metadata": {
"collapsed": false
},
"source": []
}
}
},
"nbformat": 4,
"nbformat_minor": 2
}