146 lines
5.7 KiB
Python
146 lines
5.7 KiB
Python
#!/usr/bin/env python3
|
|
# Copyright 2020-2023 The Kubeflow Authors
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
# %% [markdown]
|
|
# # Data passing tutorial
|
|
# Data passing is the most important aspect of Pipelines.
|
|
#
|
|
# In Kubeflow Pipelines, the pipeline authors compose pipelines by creating component instances (tasks) and connecting them together.
|
|
#
|
|
# Components have inputs and outputs. They can consume and produce arbitrary data.
|
|
#
|
|
# Pipeline authors establish connections between component tasks by connecting their data inputs and outputs - by passing the output of one task as an argument to another task's input.
|
|
#
|
|
# The system takes care of storing the data produced by components and later passing that data to other components for consumption as instructed by the pipeline.
|
|
#
|
|
# This tutorial shows how to create python components that produce, consume and transform data.
|
|
# It shows how to create data passing pipelines by instantiating components and connecting them together.
|
|
|
|
# %%
|
|
|
|
from typing import Dict, List
|
|
|
|
from kfp import compiler
|
|
from kfp import dsl
|
|
from kfp.dsl import Input, InputPath, Output, OutputPath, Dataset, Model, component
|
|
|
|
|
|
@component
|
|
def preprocess(
|
|
# An input parameter of type string.
|
|
message: str,
|
|
# Use Output[T] to get a metadata-rich handle to the output artifact
|
|
# of type `Dataset`.
|
|
output_dataset_one: Output[Dataset],
|
|
# A locally accessible filepath for another output artifact of type
|
|
# `Dataset`.
|
|
output_dataset_two_path: OutputPath('Dataset'),
|
|
# A locally accessible filepath for an output parameter of type string.
|
|
output_parameter_path: OutputPath(str),
|
|
# A locally accessible filepath for an output parameter of type bool.
|
|
output_bool_parameter_path: OutputPath(bool),
|
|
# A locally accessible filepath for an output parameter of type dict.
|
|
output_dict_parameter_path: OutputPath(Dict[str, int]),
|
|
# A locally accessible filepath for an output parameter of type list.
|
|
output_list_parameter_path: OutputPath(List[str]),
|
|
):
|
|
"""Dummy preprocessing step."""
|
|
|
|
# Use Dataset.path to access a local file path for writing.
|
|
# One can also use Dataset.uri to access the actual URI file path.
|
|
with open(output_dataset_one.path, 'w') as f:
|
|
f.write(message)
|
|
|
|
# OutputPath is used to just pass the local file path of the output artifact
|
|
# to the function.
|
|
with open(output_dataset_two_path, 'w') as f:
|
|
f.write(message)
|
|
|
|
with open(output_parameter_path, 'w') as f:
|
|
f.write(message)
|
|
|
|
with open(output_bool_parameter_path, 'w') as f:
|
|
f.write(
|
|
str(True)) # use either `str()` or `json.dumps()` for bool values.
|
|
|
|
import json
|
|
with open(output_dict_parameter_path, 'w') as f:
|
|
f.write(json.dumps({'A': 1, 'B': 2}))
|
|
|
|
with open(output_list_parameter_path, 'w') as f:
|
|
f.write(json.dumps(['a', 'b', 'c']))
|
|
|
|
|
|
@component
|
|
def train(
|
|
# Use InputPath to get a locally accessible path for the input artifact
|
|
# of type `Dataset`.
|
|
dataset_one_path: InputPath('Dataset'),
|
|
# Use Input[T] to get a metadata-rich handle to the input artifact
|
|
# of type `Dataset`.
|
|
dataset_two: Input[Dataset],
|
|
# An input parameter of type string.
|
|
message: str,
|
|
# Use Output[T] to get a metadata-rich handle to the output artifact
|
|
# of type `Model`.
|
|
model: Output[Model],
|
|
# An input parameter of type bool.
|
|
input_bool: bool,
|
|
# An input parameter of type dict.
|
|
input_dict: Dict[str, int],
|
|
# An input parameter of type List[str].
|
|
input_list: List[str],
|
|
# An input parameter of type int with a default value.
|
|
num_steps: int = 100,
|
|
):
|
|
"""Dummy Training step."""
|
|
with open(dataset_one_path, 'r') as input_file:
|
|
dataset_one_contents = input_file.read()
|
|
|
|
with open(dataset_two.path, 'r') as input_file:
|
|
dataset_two_contents = input_file.read()
|
|
|
|
line = (f'dataset_one_contents: {dataset_one_contents} || '
|
|
f'dataset_two_contents: {dataset_two_contents} || '
|
|
f'message: {message} || '
|
|
f'input_bool: {input_bool}, type {type(input_bool)} || '
|
|
f'input_dict: {input_dict}, type {type(input_dict)} || '
|
|
f'input_list: {input_list}, type {type(input_list)} \n')
|
|
|
|
with open(model.path, 'w') as output_file:
|
|
for i in range(num_steps):
|
|
output_file.write('Step {}\n{}\n=====\n'.format(i, line))
|
|
|
|
# model is an instance of Model artifact, which has a .metadata dictionary
|
|
# to store arbitrary metadata for the output artifact.
|
|
model.metadata['accuracy'] = 0.9
|
|
|
|
|
|
@dsl.pipeline(pipeline_root='', name='tutorial-data-passing')
|
|
def data_passing_pipeline(message: str = 'message'):
|
|
preprocess_task = preprocess(message=message)
|
|
train_task = train(
|
|
dataset_one_path=preprocess_task.outputs['output_dataset_one'],
|
|
dataset_two=preprocess_task.outputs['output_dataset_two_path'],
|
|
message=preprocess_task.outputs['output_parameter_path'],
|
|
input_bool=preprocess_task.outputs['output_bool_parameter_path'],
|
|
input_dict=preprocess_task.outputs['output_dict_parameter_path'],
|
|
input_list=preprocess_task.outputs['output_list_parameter_path'],
|
|
)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
compiler.Compiler().compile(data_passing_pipeline, __file__ + '.yaml')
|