{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "kfp_endpoint = None\n", "\n", "import datetime\n", "import time\n", "\n", "import kfp as kfp\n", "from kfp import dsl\n", "\n", "\n", "@dsl.component\n", "def do_work_op(seconds: float = 60) -> str:\n", " import datetime\n", " import time\n", " print(f\"Working for {seconds} seconds.\")\n", " for i in range(int(seconds)):\n", " print(f\"Working: {i}.\")\n", " time.sleep(1)\n", " print(\"Done.\")\n", " return datetime.datetime.now().isoformat()\n", "\n", "\n", "@kfp.dsl.pipeline(name='caching-pipeline')\n", "def caching_pipeline(seconds: float = 60):\n", " # All outputs of successful executions are cached\n", " work_task = do_work_op(seconds=seconds)\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Test 1\n", "# Running the pipeline for the first time.\n", "# The pipeline performs work and the results are cached.\n", "# The pipeline run time should be ~60 seconds.\n", "print(\"Starting test 1\")\n", "start_time = datetime.datetime.now()\n", "kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(\n", " caching_pipeline,\n", " arguments=dict(seconds=60),\n", ").wait_for_run_completion(timeout=999)\n", "elapsed_time = datetime.datetime.now() - start_time\n", "print(f\"Total run time: {int(elapsed_time.total_seconds())} seconds\")\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Test 2\n", "# Running the pipeline the second time.\n", "# The pipeline should reuse the cached results and complete faster.\n", "# The pipeline run time should be <60 seconds.\n", "print(\"Starting test 2\")\n", "start_time = datetime.datetime.now()\n", "kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(\n", " caching_pipeline,\n", " arguments=dict(seconds=60),\n", ").wait_for_run_completion(timeout=999)\n", "elapsed_time = datetime.datetime.now() - start_time\n", "print(f\"Total run time: {int(elapsed_time.total_seconds())} seconds\")\n", "\n", "if elapsed_time.total_seconds() > 60:\n", " raise RuntimeError(\"The cached execution was not re-used or pipeline run took to long to complete.\")\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "# Test 3\n", "# In this case, the pipeline should not reuse the cached result, since they are disabled.\n", "@kfp.dsl.pipeline(name='caching-pipeline3')\n", "def caching_pipeline3(seconds: float = 60):\n", " work_task = do_work_op(seconds=seconds)\n", " work_task.set_caching_options(enable_caching=False)\n", "\n", "print(\"Starting test 3\")\n", "start_time = datetime.datetime.now()\n", "kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(\n", " caching_pipeline3,\n", " arguments=dict(seconds=60),\n", ").wait_for_run_completion(timeout=999)\n", "elapsed_time = datetime.datetime.now() - start_time\n", "print(f\"Total run time: {int(elapsed_time.total_seconds())} seconds\")\n", "\n", "if elapsed_time.total_seconds() < 60:\n", " raise RuntimeError(\"The cached execution was apparently re-used, but that should not happen.\")\n" ] } ], "metadata": { "celltoolbar": "Tags", "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.7" }, "pycharm": { "stem_cell": { "cell_type": "raw", "metadata": { "collapsed": false }, "source": [] } } }, "nbformat": 4, "nbformat_minor": 2 }