# Build a Pipeline
> A tutorial on building pipelines to orchestrate your ML workflow


A Kubeflow pipeline is a portable and scalable definition of a machine learning
(ML) workflow. Each step in your ML workflow, such as preparing data or
training a model, is an instance of a pipeline component. This document
provides an overview of pipeline concepts and best practices, and instructions
describing how to build an ML pipeline.

## Before you begin

1.  Run the following command to install the Kubeflow Pipelines SDK. If you run this command in a Jupyter
    notebook, restart the kernel after installing the SDK. 

In [None]:
!pip install kfp --upgrade

2.  Import the `kfp` and `kfp.components` packages.

In [None]:
import kfp
import kfp.components as comp

## Understanding pipelines

A Kubeflow pipeline is a portable and scalable definition of an ML workflow,
based on containers. A pipeline is composed of a set of input parameters and a
list of the steps in this workflow. Each step in a pipeline is an instance of a
component, which is represented as an instance of 
[`ContainerOp`][container-op].

You can use pipelines to:

*   Orchestrate repeatable ML workflows.
*   Accelerate experimentation by running a workflow with different sets of
    hyperparameters.

### Understanding pipeline components

A pipeline component is a containerized application that performs one step in a
pipeline's workflow. Pipeline components are defined in
[component specifications][component-spec], which define the following:

*   The component's interface, its inputs and outputs.
*   The component's implementation, the container image and the command to
    execute.
*   The component's metadata, such as the name and description of the
    component.

You can build components by [defining a component specification for a
containerized application][component-dev], or you can [use the Kubeflow
Pipelines SDK to generate a component specification for a Python
function][python-function-component]. You can also [reuse prebuilt components
in your pipeline][prebuilt-components]. 

### Understanding the pipeline graph

Each step in your pipeline's workflow is an instance of a component. When
you define your pipeline, you specify the source of each step's inputs. Step
inputs can be set from the pipeline's input arguments, constants, or step
inputs can depend on the outputs of other steps in this pipeline. Kubeflow
Pipelines uses these dependencies to define your pipeline's workflow as
a graph.

For example, consider a pipeline with the following steps: ingest data,
generate statistics, preprocess data, and train a model. The following
describes the data dependencies between each step.

*   **Ingest data**: This step loads data from an external source which is
    specified using a pipeline argument, and it outputs a dataset. Since
    this step does not depend on the output of any other steps, this step
    can run first.
*   **Generate statistics**: This step uses the ingested dataset to generate
    and output a set of statistics. Since this step depends on the dataset
    produced by the ingest data step, it must run after the ingest data step.
*   **Preprocess data**: This step preprocesses the ingested dataset and
    transforms the data into a preprocessed dataset. Since this step depends
    on the dataset produced by the ingest data step, it must run after the
    ingest data step.
*   **Train a model**: This step trains a model using the preprocessed dataset,
    the generated statistics, and pipeline parameters, such as the learning
    rate. Since this step depends on the preprocessed data and the generated
    statistics, it must run after both the preprocess data and generate
    statistics steps are complete.

Since the generate statistics and preprocess data steps both depend on the
ingested data, the generate statistics and preprocess data steps can run in
parallel. All other steps are executed once their data dependencies are
available.

## Designing your pipeline

When designing your pipeline, think about how to split your ML workflow into
pipeline components. The process of splitting an ML workflow into pipeline
components is similar to the process of splitting a monolithic script into
testable functions. The following rules can help you define the components
that you need to build your pipeline.

*   Components should have a single responsibility. Having a single
    responsibility makes it easier to test and reuse a component. For example,
    if you have a component that loads data you can reuse that for similar
    tasks that load data. If you have a component that loads and transforms
    a dataset, the component can be less useful since you can use it only when
    you need to load and transform that dataset. 

*   Reuse components when possible. Kubeflow Pipelines provides [components for
    common pipeline tasks and for access to cloud
    services][prebuilt-components].

*   Consider what you need to know to debug your pipeline and research the
    lineage of the models that your pipeline produces. Kubeflow Pipelines
    stores the inputs and outputs of each pipeline step. By interrogating the
    artifacts produced by a pipeline run, you can better understand the
    variations in model quality between runs or track down bugs in your
    workflow.

In general, you should design your components with composability in mind. 

Pipelines are composed of component instances, also called steps. Steps can
define their inputs as depending on the output of another step. The
dependencies between steps define the pipeline workflow graph.

### Building pipeline components

Kubeflow pipeline components are containerized applications that perform a
step in your ML workflow. Here are the ways that you can define pipeline
components:

*   If you have a containerized application that you want to use as a
    pipeline component, create a component specification to define this
    container image as a pipeline component.
    
    This option provides the flexibility to include code written in any
    language in your pipeline, so long as you can package the application
    as a container image. Learn more about [building pipeline
    components][component-dev].

*   If your component code can be expressed as a Python function, [evaluate if
    your component can be built as a Python function-based
    component][python-function-component]. The Kubeflow Pipelines SDK makes it
    easier to build lightweight Python function-based components by saving you
    the effort of creating a component specification.

Whenever possible, [reuse prebuilt components][prebuilt-components] to save
yourself the effort of building custom components.

The example in this guide demonstrates how to build a pipeline that uses a
Python function-based component and reuses a prebuilt component.

### Understanding how data is passed between components

When Kubeflow Pipelines runs a component, a container image is started in a
Kubernetes Pod and your component’s inputs are passed in as command-line
arguments. When your component has finished, the component's outputs are
returned as files.

In your component's specification, you define the components inputs and outputs
and how the inputs and output paths are passed to your program as command-line
arguments. You can pass small inputs, such as short strings or numbers, to your
component by value. Large inputs, such as datasets, must be passed to your
component as file paths. Outputs are written to the paths that Kubeflow
Pipelines provides.

Python function-based components make it easier to build pipeline components
by building the component specification for you. Python function-based
components also handle the complexity of passing inputs into your component
and passing your function’s outputs back to your pipeline.

Learn more about how [Python function-based components handle inputs and
outputs][python-function-component-data-passing]. 

## Getting started building a pipeline

The following sections demonstrate how to get started building a Kubeflow
pipeline by walking through the process of converting a Python script into
a pipeline.

### Design your pipeline

The following steps walk through some of the design decisions you may face
when designing a pipeline.

1.  Evaluate the process. In the following example, a Python function downloads
    a zipped tar file (`.tar.gz`) that contains several CSV files, from a
    public website. The function extracts the CSV files and then merges them
    into a single file.

[container-op]: https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.dsl.html#kfp.dsl.ContainerOp
[component-spec]: https://www.kubeflow.org/docs/components/pipelines/reference/component-spec/
[python-function-component]: https://www.kubeflow.org/docs/components/pipelines/sdk/python-function-components/
[component-dev]: https://www.kubeflow.org/docs/components/pipelines/sdk/component-development/
[python-function-component-data-passing]: https://www.kubeflow.org/docs/components/pipelines/sdk/python-function-components/#understanding-how-data-is-passed-between-components
[prebuilt-components]: https://www.kubeflow.org/docs/examples/shared-resources/

In [None]:
import glob
import pandas as pd
import tarfile
import urllib.request
    
def download_and_merge_csv(url: str, output_csv: str):
  with urllib.request.urlopen(url) as res:
    tarfile.open(fileobj=res, mode="r|gz").extractall('data')
  df = pd.concat(
      [pd.read_csv(csv_file, header=None) 
       for csv_file in glob.glob('data/*.csv')])
  df.to_csv(output_csv, index=False, header=False)

2.  Run the following Python command to test the function. 

In [None]:
download_and_merge_csv(
    url='https://storage.googleapis.com/ml-pipeline-playground/iris-csv-files.tar.gz', 
    output_csv='merged_data.csv')

3.  Run the following to print the first few rows of the
    merged CSV file.

In [None]:
!head merged_data.csv

4.  Design your pipeline. For example, consider the following pipeline designs.

    *   Implement the pipeline using a single step. In this case, the pipeline
        contains one component that works similarly to the example function.
        This is a straightforward function, and implementing a single-step
        pipeline is a reasonable approach in this case.
        
        The down side of this approach is that the zipped tar file would not be
        an artifact of your pipeline runs. Not having this artifact available 
        could make it harder to debug this component in production.
        
    *   Implement this as a two-step pipeline. The first step downloads a file
        from a website. The second step extracts the CSV files from a zipped
        tar file and merges them into a single file. 
        
        This approach has a few benefits:
        
        *   You can reuse the [Web Download component][web-download-component]
            to implement the first step.
        *   Each step has a single responsibility, which makes the components
            easier to reuse.
        *   The zipped tar file is an artifact of the first pipeline step.
            This means that you can examine this artifact when debugging
            pipelines that use this component.
    
    This example implements a two-step pipeline.

### Build your pipeline components

        
1.  Build your pipeline components. This example modifies the initial script to
    extract the contents of a zipped tar file, merge the CSV files that were
    contained in the zipped tar file, and return the merged CSV file.
    
    This example builds a Python function-based component. You can also package
    your component's code as a Docker container image and define the component
    using a ComponentSpec.
    
    In this case, the following modifications were required to the original
    function.

    *   The file download logic was removed. The path to the zipped tar file
        is passed as an argument to this function.
    *   The import statements were moved inside of the function. Python
        function-based components require standalone Python functions. This
        means that any required import statements must be defined within the
        function, and any helper functions must be defined within the function.
        Learn more about [building Python function-based
        components][python-function-components].
    *   The function's arguments are decorated with the
        [`kfp.components.InputPath`][input-path] and the
        [`kfp.components.OutputPath`][output-path] annotations. These
        annotations let Kubeflow Pipelines know to provide the path to the
        zipped tar file and to create a path where your function stores the
        merged CSV file. 
        
    The following example shows the updated `merge_csv` function.

[web-download-component]: https://github.com/kubeflow/pipelines/blob/master/components/web/Download/component.yaml
[python-function-components]: https://www.kubeflow.org/docs/components/pipelines/sdk/python-function-components/
[input-path]: https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.components.html?highlight=inputpath#kfp.components.InputPath
[output-path]: https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.components.html?highlight=outputpath#kfp.components.OutputPath

In [None]:
def merge_csv(file_path: comp.InputPath('Tarball'),
              output_csv: comp.OutputPath('CSV')):
  import glob
  import pandas as pd
  import tarfile

  tarfile.open(name=file_path, mode="r|gz").extractall('data')
  df = pd.concat(
      [pd.read_csv(csv_file, header=None) 
       for csv_file in glob.glob('data/*.csv')])
  df.to_csv(output_csv, index=False, header=False)

2.  Use [`kfp.components.create_component_from_func`][create_component_from_func]
    to return a factory function that you can use to create pipeline steps.
    This example also specifies the base container image to run this function
    in, the path to save the component specification to, and a list of PyPI
    packages that need to be installed in the container at runtime.

[create_component_from_func]: (https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.components.html#kfp.components.create_component_from_func
[container-op]: https://kubeflow-pipelines.readthedocs.io/en/stable/source/kfp.dsl.html#kfp.dsl.ContainerOp

In [None]:
create_step_merge_csv = kfp.components.create_component_from_func(
    func=merge_csv,
    output_component_file='component.yaml', # This is optional. It saves the component spec for future use.
    base_image='python:3.7',
    packages_to_install=['pandas==1.1.4'])

### Build your pipeline

1.  Use [`kfp.components.load_component_from_url`][load_component_from_url]
    to load the component specification YAML for any components that you are
    reusing in this pipeline.

[load_component_from_url]: https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.components.html?highlight=load_component_from_url#kfp.components.load_component_from_url

In [None]:
web_downloader_op = kfp.components.load_component_from_url(
    'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/web/Download/component.yaml')

2.  Define your pipeline as a Python function. 

    Your pipeline function's arguments define your pipeline's parameters. Use
    pipeline parameters to experiment with different hyperparameters, such as
    the learning rate used to train a model, or pass run-level inputs, such as
    the path to an input file, into a pipeline run.
    
    Use the factory functions created by
    `kfp.components.create_component_from_func` and
    `kfp.components.load_component_from_url` to create your pipeline's tasks. 
    The inputs to the component factory functions can be pipeline parameters,
    the outputs of other tasks, or a constant value. In this case, the
    `web_downloader_task` task uses the `url` pipeline parameter, and the
    `merge_csv_task` uses the `data` output of the `web_downloader_task`.
    

In [None]:
# Define a pipeline and create a task from a component:
def my_pipeline(url):
  web_downloader_task = web_downloader_op(url=url)
  merge_csv_task = create_step_merge_csv(file=web_downloader_task.outputs['data'])
  # The outputs of the merge_csv_task can be referenced using the
  # merge_csv_task.outputs dictionary: merge_csv_task.outputs['output_csv']

### Compile and run your pipeline

After defining the pipeline in Python as described in the preceding section, use one of the following options to compile the pipeline and submit it to the Kubeflow Pipelines service.

#### Option 1: Compile and then upload in UI

1.  Run the following to compile your pipeline and save it as `pipeline.yaml`. 


In [None]:
kfp.compiler.Compiler().compile(
    pipeline_func=my_pipeline,
    package_path='pipeline.yaml')

2.  Upload and run your `pipeline.yaml` using the Kubeflow Pipelines user interface.
See the guide to [getting started with the UI][quickstart].

[quickstart]: https://www.kubeflow.org/docs/components/pipelines/overview/quickstart

#### Option 2: run the pipeline using Kubeflow Pipelines SDK client

1.  Create an instance of the [`kfp.Client` class][kfp-client] following steps in [connecting to Kubeflow Pipelines using the SDK client][connect-api].

[kfp-client]: https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.client.html#kfp.Client
[connect-api]: https://www.kubeflow.org/docs/components/pipelines/sdk/connect-api

In [None]:
client = kfp.Client() # change arguments accordingly

2.  Run the pipeline using the `kfp.Client` instance:

In [None]:
client.create_run_from_pipeline_func(
    my_pipeline,
    arguments={
        'url': 'https://storage.googleapis.com/ml-pipeline-playground/iris-csv-files.tar.gz'
    })


## Next steps

*   Learn about advanced pipeline features, such as [authoring recursive
    components][recursion] and [using conditional execution in a
    pipeline][conditional].
*   Learn how to [manipulate Kubernetes resources in a
    pipeline][k8s-resources] (Experimental).

[conditional]: https://github.com/kubeflow/pipelines/blob/master/samples/tutorials/DSL%20-%20Control%20structures/DSL%20-%20Control%20structures.py
[recursion]: https://www.kubeflow.org/docs/components/pipelines/sdk/dsl-recursion/
[k8s-resources]: https://www.kubeflow.org/docs/components/pipelines/sdk/manipulate-resources/