mirror of https://github.com/kubeflow/website.git
694 lines
30 KiB
Plaintext
694 lines
30 KiB
Plaintext
{
|
||
"cells": [
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"# Building Python function-based components\n",
|
||
"> Building your own lightweight pipelines components using Python\n",
|
||
"\n",
|
||
"A Kubeflow Pipelines component is a self-contained set of code that performs one step in your\n",
|
||
"ML workflow. A pipeline component is composed of:\n",
|
||
"\n",
|
||
"* The component code, which implements the logic needed to perform a step in your ML workflow.\n",
|
||
"* A component specification, which defines the following:\n",
|
||
" \n",
|
||
" * The component's metadata, its name and description.\n",
|
||
" * The component's interface, the component's inputs and outputs.\n",
|
||
" * The component's implementation, the Docker container image\n",
|
||
" to run, how to pass inputs to your component code, and how\n",
|
||
" to get the component's outputs.\n",
|
||
"\n",
|
||
"Python function-based components make it easier to iterate quickly by letting you build your\n",
|
||
"component code as a Python function and generating the [component specification][component-spec] for you.\n",
|
||
"This document describes how to build Python function-based components and use them in your pipeline.\n",
|
||
"\n",
|
||
"[component-spec]: https://www.kubeflow.org/docs/components/pipelines/reference/component-spec/\n",
|
||
"\n",
|
||
"## Before you begin\n",
|
||
"\n",
|
||
"1. Run the following command to install the Kubeflow Pipelines SDK. If you run this command in a Jupyter\n",
|
||
" notebook, restart the kernel after installing the SDK. "
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": [
|
||
"!pip3 install kfp --upgrade"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"2. Import the `kfp` package."
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": [
|
||
"import kfp\n",
|
||
"from kfp.components import create_component_from_func"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"3. Create an instance of the [`kfp.Client` class][kfp-client] following steps in [connecting to Kubeflow Pipelines using the SDK client][connect-api].\n",
|
||
"\n",
|
||
"[kfp-client]: https://kubeflow-pipelines.readthedocs.io/en/latest/source/client.html#kfp.Client\n",
|
||
"[connect-api]: https://www.kubeflow.org/docs/components/pipelines/sdk/connect-api"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": [
|
||
"client = kfp.Client() # change arguments accordingly"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"For more information about the Kubeflow Pipelines SDK, see the [SDK reference guide][sdk-ref].\n",
|
||
"\n",
|
||
"[sdk-ref]: https://kubeflow-pipelines.readthedocs.io/en/stable/index.html\n",
|
||
"\n",
|
||
"## Getting started with Python function-based components\n",
|
||
"\n",
|
||
"This section demonstrates how to get started building Python function-based components by walking\n",
|
||
"through the process of creating a simple component.\n",
|
||
"\n",
|
||
"1. Define your component's code as a [standalone python function](#standalone). In this example,\n",
|
||
" the function adds two floats and returns the sum of the two arguments."
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": [
|
||
"def add(a: float, b: float) -> float:\n",
|
||
" '''Calculates sum of two arguments'''\n",
|
||
" return a + b"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"2. Use `kfp.components.create_component_from_func` to generate the component specification YAML and return a\n",
|
||
" factory function that you can use to create [`kfp.dsl.ContainerOp`][container-op] class instances for your pipeline.\n",
|
||
" The component specification YAML is a reusable and shareable definition of your component.\n",
|
||
"\n",
|
||
"[container-op]: https://kubeflow-pipelines.readthedocs.io/en/stable/source/dsl.html#kfp.dsl.ContainerOp"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": [
|
||
"add_op = create_component_from_func(\n",
|
||
" add, output_component_file='add_component.yaml')"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"3. Create and run your pipeline. [Learn more about creating and running pipelines][build-pipelines].\n",
|
||
"\n",
|
||
"[build-pipelines]: https://www.kubeflow.org/docs/components/pipelines/sdk/build-component/"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": [
|
||
"import kfp.dsl as dsl\n",
|
||
"@dsl.pipeline(\n",
|
||
" name='Addition pipeline',\n",
|
||
" description='An example pipeline that performs addition calculations.'\n",
|
||
")\n",
|
||
"def add_pipeline(\n",
|
||
" a='1',\n",
|
||
" b='7',\n",
|
||
"):\n",
|
||
" # Passes a pipeline parameter and a constant value to the `add_op` factory\n",
|
||
" # function.\n",
|
||
" first_add_task = add_op(a, 4)\n",
|
||
" # Passes an output reference from `first_add_task` and a pipeline parameter\n",
|
||
" # to the `add_op` factory function. For operations with a single return\n",
|
||
" # value, the output reference can be accessed as `task.output` or\n",
|
||
" # `task.outputs['output_name']`.\n",
|
||
" second_add_task = add_op(first_add_task.output, b)\n",
|
||
"\n",
|
||
"# Specify argument values for your pipeline run.\n",
|
||
"arguments = {'a': '7', 'b': '8'}\n",
|
||
"\n",
|
||
"# Create a pipeline run, using the client you initialized in a prior step.\n",
|
||
"client.create_run_from_pipeline_func(add_pipeline, arguments=arguments)"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"## Building Python function-based components\n",
|
||
"\n",
|
||
"Use the following instructions to build a Python function-based component:\n",
|
||
"\n",
|
||
"<a name=\"standalone\"></a>\n",
|
||
"\n",
|
||
"1. Define a standalone Python function. This function must meet the following\n",
|
||
" requirements:\n",
|
||
"\n",
|
||
" * It should not use any code declared outside of the function definition.\n",
|
||
" * Import statements must be added inside the function. [Learn more about\n",
|
||
" using and installing Python packages in your component](#packages).\n",
|
||
" * Helper functions must be defined inside this function.\n",
|
||
"\n",
|
||
"1. Kubeflow Pipelines uses your function's inputs and outputs to define your\n",
|
||
" component's interface. [Learn more about passing data between\n",
|
||
" components](#pass-data). Your function's inputs and outputs must meet the\n",
|
||
" following requirements:\n",
|
||
" \n",
|
||
" * If the function accepts or returns large amounts of data or complex\n",
|
||
" data types, you must pass that data as a file. [Learn more about using\n",
|
||
" large amounts of data as inputs or outputs](#pass-by-file).\n",
|
||
" * If the function accepts numeric values as parameters, the parameters\n",
|
||
" must have type hints. Supported types are `int` and `float`. Otherwise,\n",
|
||
" parameters are passed as strings.\n",
|
||
" * If your component returns multiple small outputs (short strings,\n",
|
||
" numbers, or booleans), annotate your function with the\n",
|
||
" [`typing.NamedTuple`][named-tuple-hint] type hint and use the\n",
|
||
" [`collections.namedtuple`][named-tuple] function return your function's\n",
|
||
" outputs as a new subclass of tuple. For an example, read\n",
|
||
" [Passing parameters by value](#pass-by-value).\n",
|
||
"\n",
|
||
"1. (Optional.) If your function has complex dependencies, choose or build a\n",
|
||
" container image for your Python function to run in. [Learn more about\n",
|
||
" selecting or building your component's container image](#containers).\n",
|
||
" \n",
|
||
"1. Call [`kfp.components.create_component_from_func(func)`][create-component-from-func]\n",
|
||
" to convert your function into a pipeline component.\n",
|
||
" \n",
|
||
" * **func**: The Python function to convert.\n",
|
||
" * **base_image**: (Optional.) Specify the Docker container image to run\n",
|
||
" this function in. [Learn more about selecting or building a container\n",
|
||
" image](#containers). \n",
|
||
" * **output_component_file**: (Optional.) Writes your component definition\n",
|
||
" to a file. You can use this file to share the component with colleagues\n",
|
||
" or reuse it in different pipelines.\n",
|
||
" * **packages_to_install**: (Optional.) A list of versioned Python\n",
|
||
" packages to install before running your function. \n",
|
||
"\n",
|
||
"<a name=\"packages\"></a>\n",
|
||
"### Using and installing Python packages\n",
|
||
"\n",
|
||
"When Kubeflow Pipelines runs your pipeline, each component runs within a Docker\n",
|
||
"container image on a Kubernetes Pod. To load the packages that your Python\n",
|
||
"function depends on, one of the following must be true:\n",
|
||
"\n",
|
||
"* The package must be installed on the container image.\n",
|
||
"* The package must be defined using the `packages_to_install` parameter of the\n",
|
||
" [`kfp.components.create_component_from_func(func)`][create-component-from-func]\n",
|
||
" function.\n",
|
||
"* Your function must install the package. For example, your function can use\n",
|
||
" the [`subprocess` module][subprocess] to run a command like `pip install`\n",
|
||
" that installs a package.\n",
|
||
"\n",
|
||
"<a name=\"containers\"></a>\n",
|
||
"### Selecting or building a container image\n",
|
||
"\n",
|
||
"Currently, if you do not specify a container image, your Python-function based\n",
|
||
"component uses the [`python:3.7` container image][python37]. If your function\n",
|
||
"has complex dependencies, you may benefit from using a container image that has\n",
|
||
"your dependencies preinstalled, or building a custom container image.\n",
|
||
"Preinstalling your dependencies reduces the amount of time that your component\n",
|
||
"runs in, since your component does not need to download and install packages\n",
|
||
"each time it runs.\n",
|
||
"\n",
|
||
"Many frameworks, such as [TensorFlow][tf-docker] and [PyTorch][pytorch-docker],\n",
|
||
"and cloud service providers offer prebuilt container images that have common\n",
|
||
"dependencies installed.\n",
|
||
"\n",
|
||
"If a prebuilt container is not available, you can build a custom container\n",
|
||
"image with your Python function's dependencies. For more information about\n",
|
||
"building a custom container, read the [Dockerfile reference guide in the Docker\n",
|
||
"documentation][dockerfile].\n",
|
||
"\n",
|
||
"If you build or select a container image, instead of using the default\n",
|
||
"container image, the container image must use Python 3.5 or later.\n",
|
||
"\n",
|
||
"<a name=\"pass-data\"></a>\n",
|
||
"### Understanding how data is passed between components\n",
|
||
"\n",
|
||
"When Kubeflow Pipelines runs your component, a container image is started in a\n",
|
||
"Kubernetes Pod and your component's inputs are passed in as command-line\n",
|
||
"arguments. When your component has finished, the component’s outputs are\n",
|
||
"returned as files.\n",
|
||
"\n",
|
||
"Python function-based components make it easier to build pipeline components by\n",
|
||
"building the component specification for you. Python function-based components\n",
|
||
"also handle the complexity of passing inputs into your component and passing\n",
|
||
"your function's outputs back to your pipeline. \n",
|
||
"\n",
|
||
"The following sections describe how to pass parameters by value and by file. \n",
|
||
"\n",
|
||
"* Parameters that are passed by value include numbers, booleans, and short\n",
|
||
" strings. Kubeflow Pipelines passes parameters to your component by value,\n",
|
||
" by passing the values as command-line arguments.\n",
|
||
"* Parameters that are passed by file include CSV, images, and complex types.\n",
|
||
" These files are stored in a location that is accessible to your component\n",
|
||
" running on Kubernetes, such as a persistent volume claim or a cloud\n",
|
||
" storage service. Kubeflow Pipelines passes parameters to your component by\n",
|
||
" file, by passing their paths as a command-line argument.\n",
|
||
"\n",
|
||
"<a name=\"parameter-names\"></a>\n",
|
||
"#### Input and output parameter names\n",
|
||
"\n",
|
||
"When you use the Kubeflow Pipelines SDK to convert your Python function to a\n",
|
||
"pipeline component, the Kubeflow Pipelines SDK uses the function's interface\n",
|
||
"to define the interface of your component in the following ways:\n",
|
||
"\n",
|
||
"* Some arguments define input parameters.\n",
|
||
"* Some arguments define output parameters.\n",
|
||
"* The function's return value is used as an output parameter. If the return\n",
|
||
" value is a [`collections.namedtuple`][named-tuple], the named tuple is used\n",
|
||
" to return several small values. \n",
|
||
"\n",
|
||
"Since you can pass parameters between components as a value or as a path, the\n",
|
||
"Kubeflow Pipelines SDK removes common parameter suffixes that leak the\n",
|
||
"component's expected implementation. For example, a Python function-based\n",
|
||
"component that ingests data and outputs CSV data may have an output argument\n",
|
||
"that is defined as `csv_path: comp.OutputPath(str)`. In this case, the output\n",
|
||
"is the CSV data, not the path. So, the Kubeflow Pipelines SDK simplifies the\n",
|
||
"output name to `csv`.\n",
|
||
"\n",
|
||
"The Kubeflow Pipelines SDK uses the following rules to define the input and\n",
|
||
"output parameter names in your component's interface:\n",
|
||
"\n",
|
||
"* If the argument name ends with `_path` and the argument is annotated as an\n",
|
||
" [`kfp.components.InputPath`][input-path] or\n",
|
||
" [`kfp.components.OutputPath`][output-path], the parameter name is the\n",
|
||
" argument name with the trailing `_path` removed.\n",
|
||
"* If the argument name ends with `_file`, the parameter name is the argument\n",
|
||
" name with the trailing `_file` removed.\n",
|
||
"* If you return a single small value from your component using the `return`\n",
|
||
" statement, the output parameter is named `output`.\n",
|
||
"* If you return several small values from your component by returning a \n",
|
||
" [`collections.namedtuple`][named-tuple], the Kubeflow Pipelines SDK uses\n",
|
||
" the tuple's field names as the output parameter names. \n",
|
||
"\n",
|
||
"Otherwise, the Kubeflow Pipelines SDK uses the argument name as the parameter\n",
|
||
"name.\n",
|
||
"\n",
|
||
"<a name=\"pass-by-value\"></a>\n",
|
||
"#### Passing parameters by value\n",
|
||
"\n",
|
||
"Python function-based components make it easier to pass parameters between\n",
|
||
"components by value (such as numbers, booleans, and short strings), by letting\n",
|
||
"you define your component’s interface by annotating your Python function. The\n",
|
||
"supported types are `int`, `float`, `bool`, and `str`. You can also pass \n",
|
||
"`list` or `dict` instances by value, if they contain small values, such as\n",
|
||
"`int`, `float`, `bool`, or `str` values. If you do not annotate your function,\n",
|
||
"these input parameters are passed as strings.\n",
|
||
"\n",
|
||
"If your component returns multiple outputs by value, annotate your function\n",
|
||
"with the [`typing.NamedTuple`][named-tuple-hint] type hint and use the\n",
|
||
"[`collections.namedtuple`][named-tuple] function to return your function's\n",
|
||
"outputs as a new subclass of `tuple`.\n",
|
||
"\n",
|
||
"You can also return metadata and metrics from your function.\n",
|
||
"\n",
|
||
"* Metadata helps you visualize pipeline results.\n",
|
||
" [Learn more about visualizing pipeline metadata][kfp-visualize].\n",
|
||
"* Metrics help you compare pipeline runs.\n",
|
||
" [Learn more about using pipeline metrics][kfp-metrics].\n",
|
||
" \n",
|
||
"The following example demonstrates how to return multiple outputs by value,\n",
|
||
"including component metadata and metrics. \n",
|
||
"\n",
|
||
"[python37]: https://hub.docker.com/layers/python/library/python/3.7/images/sha256-7eef781ed825f3b95c99f03f4189a8e30e718726e8490651fa1b941c6c815ad1?context=explore\n",
|
||
"[create-component-from-func]: https://kubeflow-pipelines.readthedocs.io/en/latest/source/components.html#kfp.components.create_component_from_func\n",
|
||
"[subprocess]: https://docs.python.org/3/library/subprocess.html\n",
|
||
"[tf-docker]: https://www.tensorflow.org/install/docker\n",
|
||
"[pytorch-docker]: https://hub.docker.com/r/pytorch/pytorch/tags\n",
|
||
"[dockerfile]: https://docs.docker.com/engine/reference/builder/\n",
|
||
"[named-tuple-hint]: https://docs.python.org/3/library/typing.html#typing.NamedTuple\n",
|
||
"[named-tuple]: https://docs.python.org/3/library/collections.html#collections.namedtuple\n",
|
||
"[kfp-visualize]: https://www.kubeflow.org/docs/components/pipelines/sdk/output-viewer/\n",
|
||
"[kfp-metrics]: https://www.kubeflow.org/docs/components/pipelines/sdk/pipelines-metrics/\n",
|
||
"[input-path]: https://kubeflow-pipelines.readthedocs.io/en/latest/source/components.html#kfp.components.InputPath\n",
|
||
"[output-path]: https://kubeflow-pipelines.readthedocs.io/en/latest/source/components.html#kfp.components.OutputPath"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": [
|
||
"from typing import NamedTuple\n",
|
||
"def multiple_return_values_example(a: float, b: float) -> NamedTuple(\n",
|
||
" 'ExampleOutputs',\n",
|
||
" [\n",
|
||
" ('sum', float),\n",
|
||
" ('product', float),\n",
|
||
" ('mlpipeline_ui_metadata', 'UI_metadata'),\n",
|
||
" ('mlpipeline_metrics', 'Metrics')\n",
|
||
" ]):\n",
|
||
" \"\"\"Example function that demonstrates how to return multiple values.\"\"\" \n",
|
||
" sum_value = a + b\n",
|
||
" product_value = a * b\n",
|
||
"\n",
|
||
" # Export a sample tensorboard\n",
|
||
" metadata = {\n",
|
||
" 'outputs' : [{\n",
|
||
" 'type': 'tensorboard',\n",
|
||
" 'source': 'gs://ml-pipeline-dataset/tensorboard-train',\n",
|
||
" }]\n",
|
||
" }\n",
|
||
"\n",
|
||
" # Export two metrics\n",
|
||
" metrics = {\n",
|
||
" 'metrics': [\n",
|
||
" {\n",
|
||
" 'name': 'sum',\n",
|
||
" 'numberValue': float(sum_value),\n",
|
||
" },{\n",
|
||
" 'name': 'product',\n",
|
||
" 'numberValue': float(product_value),\n",
|
||
" }\n",
|
||
" ] \n",
|
||
" }\n",
|
||
"\n",
|
||
" from collections import namedtuple\n",
|
||
" example_output = namedtuple(\n",
|
||
" 'ExampleOutputs',\n",
|
||
" ['sum', 'product', 'mlpipeline_ui_metadata', 'mlpipeline_metrics'])\n",
|
||
" return example_output(sum_value, product_value, metadata, metrics)"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"<a name=\"pass-by-file\"></a>\n",
|
||
"#### Passing parameters by file\n",
|
||
"\n",
|
||
"Python function-based components make it easier to pass files to your\n",
|
||
"component, or to return files from your component, by letting you annotate\n",
|
||
"your Python function's parameters to specify which parameters refer to a file. \n",
|
||
"Your Python function's parameters can refer to either input or output files.\n",
|
||
"If your parameter is an output file, Kubeflow Pipelines passes your function a\n",
|
||
"path or stream that you can use to store your output file.\n",
|
||
"\n",
|
||
"The following example accepts a file as an input and returns two files as outputs."
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": [
|
||
"def split_text_lines(\n",
|
||
" source_path: comp.InputPath(str),\n",
|
||
" odd_lines_path: comp.OutputPath(str),\n",
|
||
" even_lines_path: comp.OutputPath(str)):\n",
|
||
" \"\"\"Splits a text file into two files, with even lines going to one file\n",
|
||
" and odd lines to the other.\"\"\"\n",
|
||
"\n",
|
||
" with open(source_path, 'r') as reader:\n",
|
||
" with open(odd_lines_path, 'w') as odd_writer:\n",
|
||
" with open(even_lines_path, 'w') as even_writer:\n",
|
||
" while True:\n",
|
||
" line = reader.readline()\n",
|
||
" if line == \"\":\n",
|
||
" break\n",
|
||
" odd_writer.write(line)\n",
|
||
" line = reader.readline()\n",
|
||
" if line == \"\":\n",
|
||
" break\n",
|
||
" even_writer.write(line)"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"In this example, the inputs and outputs are defined as parameters of the\n",
|
||
"`split_text_lines` function. This lets Kubeflow Pipelines pass the path to the\n",
|
||
"source data file and the paths to the output data files into the function.\n",
|
||
"\n",
|
||
"To accept a file as an input parameter, use one of the following type annotations:\n",
|
||
"\n",
|
||
"* [`kfp.components.InputBinaryFile`][input-binary]: Use this annotation to\n",
|
||
" specify that your function expects a parameter to be an\n",
|
||
" [`io.BytesIO`][bytesio] instance that this function can read.\n",
|
||
"* [`kfp.components.InputPath`][input-path]: Use this annotation to specify that\n",
|
||
" your function expects a parameter to be the path to the input file as\n",
|
||
" a `string`.\n",
|
||
"* [`kfp.components.InputTextFile`][input-text]: Use this annotation to specify\n",
|
||
" that your function expects a parameter to be an\n",
|
||
" [`io.TextIOWrapper`][textiowrapper] instance that this function can read.\n",
|
||
"\n",
|
||
"To return a file as an output, use one of the following type annotations:\n",
|
||
"\n",
|
||
"* [`kfp.components.OutputBinaryFile`][output-binary]: Use this annotation to\n",
|
||
" specify that your function expects a parameter to be an\n",
|
||
" [`io.BytesIO`][bytesio] instance that this function can write to.\n",
|
||
"* [`kfp.components.OutputPath`][output-path]: Use this annotation to specify that\n",
|
||
" your function expects a parameter to be the path to store the output file at\n",
|
||
" as a `string`.\n",
|
||
"* [`kfp.components.OutputTextFile`][output-text]: Use this annotation to specify\n",
|
||
" that your function expects a parameter to be an\n",
|
||
" [`io.TextIOWrapper`][textiowrapper] that this function can write to.\n",
|
||
"\n",
|
||
"[input-binary]: https://kubeflow-pipelines.readthedocs.io/en/latest/source/components.html#kfp.components.InputBinaryFile\n",
|
||
"[input-path]: https://kubeflow-pipelines.readthedocs.io/en/latest/source/components.html#kfp.components.InputPath\n",
|
||
"[input-text]: https://kubeflow-pipelines.readthedocs.io/en/latest/source/components.html#kfp.components.InputTextFile\n",
|
||
"[output-binary]: https://kubeflow-pipelines.readthedocs.io/en/latest/source/components.html#kfp.components.OutputBinaryFile\n",
|
||
"[output-path]: https://kubeflow-pipelines.readthedocs.io/en/latest/source/components.html#kfp.components.OutputPath\n",
|
||
"[output-text]: https://kubeflow-pipelines.readthedocs.io/en/latest/source/components.html#kfp.components.OutputTextFile\n",
|
||
"[bytesio]: https://docs.python.org/3/library/io.html#io.BytesIO\n",
|
||
"[textiowrapper]: https://docs.python.org/3/library/io.html#io.TextIOWrapper\n",
|
||
"\n",
|
||
"## Example Python function-based component\n",
|
||
"\n",
|
||
"This section demonstrates how to build a Python function-based component that uses imports,\n",
|
||
"helper functions, and produces multiple outputs.\n",
|
||
"\n",
|
||
"1. Define your function. This example function uses the `numpy` package to calculate the quotient\n",
|
||
" and remainder for a given dividend and divisor in a helper function. In addition to the quotient\n",
|
||
" and remainder, the function also returns metadata for visualization and two metrics."
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": [
|
||
"from typing import NamedTuple\n",
|
||
"\n",
|
||
"def my_divmod(\n",
|
||
" dividend: float,\n",
|
||
" divisor: float) -> NamedTuple(\n",
|
||
" 'MyDivmodOutput',\n",
|
||
" [\n",
|
||
" ('quotient', float),\n",
|
||
" ('remainder', float),\n",
|
||
" ('mlpipeline_ui_metadata', 'UI_metadata'),\n",
|
||
" ('mlpipeline_metrics', 'Metrics')\n",
|
||
" ]):\n",
|
||
" '''Divides two numbers and calculate the quotient and remainder'''\n",
|
||
"\n",
|
||
" # Import the numpy package inside the component function\n",
|
||
" import numpy as np\n",
|
||
"\n",
|
||
" # Define a helper function\n",
|
||
" def divmod_helper(dividend, divisor):\n",
|
||
" return np.divmod(dividend, divisor)\n",
|
||
"\n",
|
||
" (quotient, remainder) = divmod_helper(dividend, divisor)\n",
|
||
"\n",
|
||
" from tensorflow.python.lib.io import file_io\n",
|
||
" import json\n",
|
||
"\n",
|
||
" # Export a sample tensorboard\n",
|
||
" metadata = {\n",
|
||
" 'outputs' : [{\n",
|
||
" 'type': 'tensorboard',\n",
|
||
" 'source': 'gs://ml-pipeline-dataset/tensorboard-train',\n",
|
||
" }]\n",
|
||
" }\n",
|
||
"\n",
|
||
" # Export two metrics\n",
|
||
" metrics = {\n",
|
||
" 'metrics': [{\n",
|
||
" 'name': 'quotient',\n",
|
||
" 'numberValue': float(quotient),\n",
|
||
" },{\n",
|
||
" 'name': 'remainder',\n",
|
||
" 'numberValue': float(remainder),\n",
|
||
" }]}\n",
|
||
"\n",
|
||
" from collections import namedtuple\n",
|
||
" divmod_output = namedtuple('MyDivmodOutput',\n",
|
||
" ['quotient', 'remainder', 'mlpipeline_ui_metadata',\n",
|
||
" 'mlpipeline_metrics'])\n",
|
||
" return divmod_output(quotient, remainder, json.dumps(metadata),\n",
|
||
" json.dumps(metrics))"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"2. Test your function by running it directly, or with unit tests."
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": [
|
||
"my_divmod(100, 7)"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"3. This should return a result like the following:\n",
|
||
"\n",
|
||
" ```\n",
|
||
" MyDivmodOutput(quotient=14, remainder=2, mlpipeline_ui_metadata='{\"outputs\": [{\"type\": \"tensorboard\", \"source\": \"gs://ml-pipeline-dataset/tensorboard-train\"}]}', mlpipeline_metrics='{\"metrics\": [{\"name\": \"quotient\", \"numberValue\": 14.0}, {\"name\": \"remainder\", \"numberValue\": 2.0}]}')\n",
|
||
" ```\n",
|
||
"\n",
|
||
"4. Use `kfp.components.create_component_from_func` to return a factory function that you can use to create\n",
|
||
" [`kfp.dsl.ContainerOp`][container-op] class instances for your pipeline. This example also specifies the base container\n",
|
||
" image to run this function in.\n",
|
||
"\n",
|
||
"[container-op]: https://kubeflow-pipelines.readthedocs.io/en/stable/source/dsl.html#kfp.dsl.ContainerOp"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": [
|
||
"divmod_op = comp.create_component_from_func(\n",
|
||
" my_divmod, base_image='tensorflow/tensorflow:1.11.0-py3')"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"4. Define your pipeline. This example uses the `divmod_op` factory function and the `add_op`\n",
|
||
" factory function from an earlier example."
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": [
|
||
"import kfp.dsl as dsl\n",
|
||
"@dsl.pipeline(\n",
|
||
" name='Calculation pipeline',\n",
|
||
" description='An example pipeline that performs arithmetic calculations.'\n",
|
||
")\n",
|
||
"def calc_pipeline(\n",
|
||
" a='1',\n",
|
||
" b='7',\n",
|
||
" c='17',\n",
|
||
"):\n",
|
||
" # Passes a pipeline parameter and a constant value as operation arguments.\n",
|
||
" add_task = add_op(a, 4) # The add_op factory function returns\n",
|
||
" # a dsl.ContainerOp class instance. \n",
|
||
"\n",
|
||
" # Passes the output of the add_task and a pipeline parameter as operation\n",
|
||
" # arguments. For an operation with a single return value, the output\n",
|
||
" # reference is accessed using `task.output` or\n",
|
||
" # `task.outputs['output_name']`.\n",
|
||
" divmod_task = divmod_op(add_task.output, b)\n",
|
||
"\n",
|
||
" # For an operation with multiple return values, output references are\n",
|
||
" # accessed as `task.outputs['output_name']`.\n",
|
||
" result_task = add_op(divmod_task.outputs['quotient'], c)"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"5. Compile and run your pipeline. [Learn more about compiling and running pipelines][build-pipelines].\n",
|
||
"\n",
|
||
"[build-pipelines]: https://www.kubeflow.org/docs/components/pipelines/sdk/build-pipeline/#compile-and-run-your-pipeline"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": [
|
||
"# Specify pipeline argument values\n",
|
||
"arguments = {'a': '7', 'b': '8'}\n",
|
||
"\n",
|
||
"# Submit a pipeline run\n",
|
||
"client.create_run_from_pipeline_func(calc_pipeline, arguments=arguments)"
|
||
]
|
||
}
|
||
],
|
||
"metadata": {
|
||
"environment": {
|
||
"name": "tf2-2-3-gpu.2-3.m56",
|
||
"type": "gcloud",
|
||
"uri": "gcr.io/deeplearning-platform-release/tf2-2-3-gpu.2-3:m56"
|
||
},
|
||
"kernelspec": {
|
||
"display_name": "Python 3",
|
||
"language": "python",
|
||
"name": "python3"
|
||
},
|
||
"language_info": {
|
||
"codemirror_mode": {
|
||
"name": "ipython",
|
||
"version": 3
|
||
},
|
||
"file_extension": ".py",
|
||
"mimetype": "text/x-python",
|
||
"name": "python",
|
||
"nbconvert_exporter": "python",
|
||
"pygments_lexer": "ipython3",
|
||
"version": "3.7.10"
|
||
}
|
||
},
|
||
"nbformat": 4,
|
||
"nbformat_minor": 4
|
||
}
|