145 lines
4.8 KiB
Plaintext
145 lines
4.8 KiB
Plaintext
{
|
|
"cells": [
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"kfp_endpoint = None\n",
|
|
"\n",
|
|
"import datetime\n",
|
|
"import time\n",
|
|
"\n",
|
|
"import kfp.deprecated as kfp\n",
|
|
"from kfp.deprecated.components import create_component_from_func\n",
|
|
"\n",
|
|
"\n",
|
|
"@create_component_from_func\n",
|
|
"def do_work_op(seconds: float = 60) -> str:\n",
|
|
" import datetime\n",
|
|
" import time\n",
|
|
" print(f\"Working for {seconds} seconds.\")\n",
|
|
" for i in range(int(seconds)):\n",
|
|
" print(f\"Working: {i}.\")\n",
|
|
" time.sleep(1)\n",
|
|
" print(\"Done.\")\n",
|
|
" return datetime.datetime.now().isoformat()\n",
|
|
"\n",
|
|
"\n",
|
|
"def caching_pipeline(seconds: float = 60):\n",
|
|
" # All outputs of successful executions are cached\n",
|
|
" work_task = do_work_op(seconds)\n"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# Test 1\n",
|
|
"# Running the pipeline for the first time.\n",
|
|
"# The pipeline performs work and the results are cached.\n",
|
|
"# The pipeline run time should be ~60 seconds.\n",
|
|
"print(\"Starting test 1\")\n",
|
|
"start_time = datetime.datetime.now()\n",
|
|
"kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(\n",
|
|
" caching_pipeline,\n",
|
|
" arguments=dict(seconds=60),\n",
|
|
").wait_for_run_completion(timeout=999)\n",
|
|
"elapsed_time = datetime.datetime.now() - start_time\n",
|
|
"print(f\"Total run time: {int(elapsed_time.total_seconds())} seconds\")\n"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# Test 2\n",
|
|
"# Running the pipeline the second time.\n",
|
|
"# The pipeline should reuse the cached results and complete faster.\n",
|
|
"# The pipeline run time should be <60 seconds.\n",
|
|
"print(\"Starting test 2\")\n",
|
|
"start_time = datetime.datetime.now()\n",
|
|
"kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(\n",
|
|
" caching_pipeline,\n",
|
|
" arguments=dict(seconds=60),\n",
|
|
").wait_for_run_completion(timeout=999)\n",
|
|
"elapsed_time = datetime.datetime.now() - start_time\n",
|
|
"print(f\"Total run time: {int(elapsed_time.total_seconds())} seconds\")\n",
|
|
"\n",
|
|
"if elapsed_time.total_seconds() > 60:\n",
|
|
" raise RuntimeError(\"The cached execution was not re-used or pipeline run took to long to complete.\")\n"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# Test 3\n",
|
|
"# For each task we can specify the maximum cached data staleness.\n",
|
|
"# For example: task.execution_options.caching_strategy.max_cache_staleness = \"P7D\" # (7 days)\n",
|
|
"# The `max_cache_staleness` attribute uses the [RFC3339 duration format](https://tools.ietf.org/html/rfc3339#appendix-A). For example: \"P0D\" (0 days), \"PT5H\" (5 hours; notice the \"T\")\n",
|
|
"# Cached results that are older than the specified time span, are not reused.\n",
|
|
"# In this case, the pipeline should not reuse the cached result, since they will be stale.\n",
|
|
"\n",
|
|
"def caching_pipeline3(seconds: float = 60):\n",
|
|
" # All outputs of successful executions are cached\n",
|
|
" work_task = do_work_op(seconds)\n",
|
|
" # TODO(Ark-kun): Fix handling non-zero periods in the backend\n",
|
|
" work_task.execution_options.caching_strategy.max_cache_staleness = 'P0D' # = Period: Time: 0 seconds\n",
|
|
"\n",
|
|
"# Waiting for some time for the cached data to become stale:\n",
|
|
"time.sleep(10)\n",
|
|
"print(\"Starting test 3\")\n",
|
|
"start_time = datetime.datetime.now()\n",
|
|
"kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(\n",
|
|
" caching_pipeline3,\n",
|
|
" arguments=dict(seconds=60),\n",
|
|
").wait_for_run_completion(timeout=999)\n",
|
|
"elapsed_time = datetime.datetime.now() - start_time\n",
|
|
"print(f\"Total run time: {int(elapsed_time.total_seconds())} seconds\")\n",
|
|
"\n",
|
|
"if elapsed_time.total_seconds() < 60:\n",
|
|
" raise RuntimeError(\"The cached execution was apparently re-used, but that should not happen.\")\n"
|
|
]
|
|
}
|
|
],
|
|
"metadata": {
|
|
"celltoolbar": "Tags",
|
|
"kernelspec": {
|
|
"display_name": "Python 3",
|
|
"language": "python",
|
|
"name": "python3"
|
|
},
|
|
"language_info": {
|
|
"codemirror_mode": {
|
|
"name": "ipython",
|
|
"version": 3
|
|
},
|
|
"file_extension": ".py",
|
|
"mimetype": "text/x-python",
|
|
"name": "python",
|
|
"nbconvert_exporter": "python",
|
|
"pygments_lexer": "ipython3",
|
|
"version": "3.6.7"
|
|
},
|
|
"pycharm": {
|
|
"stem_cell": {
|
|
"cell_type": "raw",
|
|
"metadata": {
|
|
"collapsed": false
|
|
},
|
|
"source": []
|
|
}
|
|
}
|
|
},
|
|
"nbformat": 4,
|
|
"nbformat_minor": 2
|
|
}
|