{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "kfp_endpoint = None\n", "\n", "import datetime\n", "import time\n", "\n", "import kfp.deprecated as kfp\n", "from kfp.deprecated.components import create_component_from_func\n", "\n", "\n", "@create_component_from_func\n", "def do_work_op(seconds: float = 60) -> str:\n", " import datetime\n", " import time\n", " print(f\"Working for {seconds} seconds.\")\n", " for i in range(int(seconds)):\n", " print(f\"Working: {i}.\")\n", " time.sleep(1)\n", " print(\"Done.\")\n", " return datetime.datetime.now().isoformat()\n", "\n", "\n", "def caching_pipeline(seconds: float = 60):\n", " # All outputs of successful executions are cached\n", " work_task = do_work_op(seconds)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Test 1\n", "# Running the pipeline for the first time.\n", "# The pipeline performs work and the results are cached.\n", "# The pipeline run time should be ~60 seconds.\n", "print(\"Starting test 1\")\n", "start_time = datetime.datetime.now()\n", "kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(\n", " caching_pipeline,\n", " arguments=dict(seconds=60),\n", ").wait_for_run_completion(timeout=999)\n", "elapsed_time = datetime.datetime.now() - start_time\n", "print(f\"Total run time: {int(elapsed_time.total_seconds())} seconds\")\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Test 2\n", "# Running the pipeline the second time.\n", "# The pipeline should reuse the cached results and complete faster.\n", "# The pipeline run time should be <60 seconds.\n", "print(\"Starting test 2\")\n", "start_time = datetime.datetime.now()\n", "kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(\n", " caching_pipeline,\n", " arguments=dict(seconds=60),\n", ").wait_for_run_completion(timeout=999)\n", "elapsed_time = datetime.datetime.now() - start_time\n", "print(f\"Total run time: {int(elapsed_time.total_seconds())} seconds\")\n", "\n", "if elapsed_time.total_seconds() > 60:\n", " raise RuntimeError(\"The cached execution was not re-used or pipeline run took to long to complete.\")\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Test 3\n", "# For each task we can specify the maximum cached data staleness.\n", "# For example: task.execution_options.caching_strategy.max_cache_staleness = \"P7D\" # (7 days)\n", "# The `max_cache_staleness` attribute uses the [RFC3339 duration format](https://tools.ietf.org/html/rfc3339#appendix-A). For example: \"P0D\" (0 days), \"PT5H\" (5 hours; notice the \"T\")\n", "# Cached results that are older than the specified time span, are not reused.\n", "# In this case, the pipeline should not reuse the cached result, since they will be stale.\n", "\n", "def caching_pipeline3(seconds: float = 60):\n", " # All outputs of successful executions are cached\n", " work_task = do_work_op(seconds)\n", " # TODO(Ark-kun): Fix handling non-zero periods in the backend\n", " work_task.execution_options.caching_strategy.max_cache_staleness = 'P0D' # = Period: Time: 0 seconds\n", "\n", "# Waiting for some time for the cached data to become stale:\n", "time.sleep(10)\n", "print(\"Starting test 3\")\n", "start_time = datetime.datetime.now()\n", "kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(\n", " caching_pipeline3,\n", " arguments=dict(seconds=60),\n", ").wait_for_run_completion(timeout=999)\n", "elapsed_time = datetime.datetime.now() - start_time\n", "print(f\"Total run time: {int(elapsed_time.total_seconds())} seconds\")\n", "\n", "if elapsed_time.total_seconds() < 60:\n", " raise RuntimeError(\"The cached execution was apparently re-used, but that should not happen.\")\n" ] } ], "metadata": { "celltoolbar": "Tags", "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.7" }, "pycharm": { "stem_cell": { "cell_type": "raw", "metadata": { "collapsed": false }, "source": [] } } }, "nbformat": 4, "nbformat_minor": 2 }