pipelines/samples/tutorials/Data passing in python comp.../Data passing in python comp...

146 lines
5.7 KiB
Python

#!/usr/bin/env python3
# Copyright 2020-2023 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# %% [markdown]
# # Data passing tutorial
# Data passing is the most important aspect of Pipelines.
#
# In Kubeflow Pipelines, the pipeline authors compose pipelines by creating component instances (tasks) and connecting them together.
#
# Components have inputs and outputs. They can consume and produce arbitrary data.
#
# Pipeline authors establish connections between component tasks by connecting their data inputs and outputs - by passing the output of one task as an argument to another task's input.
#
# The system takes care of storing the data produced by components and later passing that data to other components for consumption as instructed by the pipeline.
#
# This tutorial shows how to create python components that produce, consume and transform data.
# It shows how to create data passing pipelines by instantiating components and connecting them together.
# %%
from typing import Dict, List
from kfp import compiler
from kfp import dsl
from kfp.dsl import Input, InputPath, Output, OutputPath, Dataset, Model, component
@component
def preprocess(
# An input parameter of type string.
message: str,
# Use Output[T] to get a metadata-rich handle to the output artifact
# of type `Dataset`.
output_dataset_one: Output[Dataset],
# A locally accessible filepath for another output artifact of type
# `Dataset`.
output_dataset_two_path: OutputPath('Dataset'),
# A locally accessible filepath for an output parameter of type string.
output_parameter_path: OutputPath(str),
# A locally accessible filepath for an output parameter of type bool.
output_bool_parameter_path: OutputPath(bool),
# A locally accessible filepath for an output parameter of type dict.
output_dict_parameter_path: OutputPath(Dict[str, int]),
# A locally accessible filepath for an output parameter of type list.
output_list_parameter_path: OutputPath(List[str]),
):
"""Dummy preprocessing step."""
# Use Dataset.path to access a local file path for writing.
# One can also use Dataset.uri to access the actual URI file path.
with open(output_dataset_one.path, 'w') as f:
f.write(message)
# OutputPath is used to just pass the local file path of the output artifact
# to the function.
with open(output_dataset_two_path, 'w') as f:
f.write(message)
with open(output_parameter_path, 'w') as f:
f.write(message)
with open(output_bool_parameter_path, 'w') as f:
f.write(
str(True)) # use either `str()` or `json.dumps()` for bool values.
import json
with open(output_dict_parameter_path, 'w') as f:
f.write(json.dumps({'A': 1, 'B': 2}))
with open(output_list_parameter_path, 'w') as f:
f.write(json.dumps(['a', 'b', 'c']))
@component
def train(
# Use InputPath to get a locally accessible path for the input artifact
# of type `Dataset`.
dataset_one_path: InputPath('Dataset'),
# Use Input[T] to get a metadata-rich handle to the input artifact
# of type `Dataset`.
dataset_two: Input[Dataset],
# An input parameter of type string.
message: str,
# Use Output[T] to get a metadata-rich handle to the output artifact
# of type `Model`.
model: Output[Model],
# An input parameter of type bool.
input_bool: bool,
# An input parameter of type dict.
input_dict: Dict[str, int],
# An input parameter of type List[str].
input_list: List[str],
# An input parameter of type int with a default value.
num_steps: int = 100,
):
"""Dummy Training step."""
with open(dataset_one_path, 'r') as input_file:
dataset_one_contents = input_file.read()
with open(dataset_two.path, 'r') as input_file:
dataset_two_contents = input_file.read()
line = (f'dataset_one_contents: {dataset_one_contents} || '
f'dataset_two_contents: {dataset_two_contents} || '
f'message: {message} || '
f'input_bool: {input_bool}, type {type(input_bool)} || '
f'input_dict: {input_dict}, type {type(input_dict)} || '
f'input_list: {input_list}, type {type(input_list)} \n')
with open(model.path, 'w') as output_file:
for i in range(num_steps):
output_file.write('Step {}\n{}\n=====\n'.format(i, line))
# model is an instance of Model artifact, which has a .metadata dictionary
# to store arbitrary metadata for the output artifact.
model.metadata['accuracy'] = 0.9
@dsl.pipeline(pipeline_root='', name='tutorial-data-passing')
def data_passing_pipeline(message: str = 'message'):
preprocess_task = preprocess(message=message)
train_task = train(
dataset_one_path=preprocess_task.outputs['output_dataset_one'],
dataset_two=preprocess_task.outputs['output_dataset_two_path'],
message=preprocess_task.outputs['output_parameter_path'],
input_bool=preprocess_task.outputs['output_bool_parameter_path'],
input_dict=preprocess_task.outputs['output_dict_parameter_path'],
input_list=preprocess_task.outputs['output_list_parameter_path'],
)
if __name__ == '__main__':
compiler.Compiler().compile(data_passing_pipeline, __file__ + '.yaml')