pipelines/components/pandas/Transform_DataFrame/_samples/sample_pipeline.py

59 lines
2.9 KiB
Python

import kfp
from kfp import components
chicago_taxi_dataset_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/e3337b8bdcd63636934954e592d4b32c95b49129/components/datasets/Chicago%20Taxi/component.yaml')
convert_csv_to_apache_parquet_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/d737c448723b9f541a3543012b4414c17b2eab5c/components/_converters/ApacheParquet/from_CSV/component.yaml')
convert_apache_parquet_to_csv_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/af3eaf64e87313795cad1add9bfd9fa1e86af6de/components/_converters/ApacheParquet/to_CSV/component.yaml')
pandas_transform_parquet_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/6162d55998b176b50267d351241100bb0ee715bc/components/pandas/Transform_DataFrame/in_ApacheParquet_format/component.yaml')
pandas_transform_csv_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/6162d55998b176b50267d351241100bb0ee715bc/components/pandas/Transform_DataFrame/in_CSV_format/component.yaml')
def pandas_transform_pipeline():
training_data_in_csv = chicago_taxi_dataset_op(
where='trip_start_timestamp >= "2019-01-01" AND trip_start_timestamp < "2019-02-01"',
select='tips,trip_seconds,trip_miles,pickup_community_area,dropoff_community_area,fare,tolls,extras,trip_total',
limit=1000,
).output
training_data_in_parquet = convert_csv_to_apache_parquet_op(training_data_in_csv).output
training_data_for_classification_in_parquet = pandas_transform_parquet_op(
table=training_data_in_parquet,
transform_code='''df.insert(0, "was_tipped", df["tips"] > 0); del df["tips"]''',
).output
convert_apache_parquet_to_csv_op(training_data_for_classification_in_parquet)
features_in_parquet = pandas_transform_parquet_op(
table=training_data_in_parquet,
transform_code='''df = df.drop(columns=["tips"])''',
).output
convert_apache_parquet_to_csv_op(features_in_parquet)
labels_in_parquet = pandas_transform_parquet_op(
table=training_data_in_parquet,
transform_code='''df = df[["tips"]]''',
).output
convert_apache_parquet_to_csv_op(labels_in_parquet)
training_data_for_classification_in_csv = pandas_transform_csv_op(
table=training_data_in_csv,
transform_code='''df.insert(0, "was_tipped", df["tips"] > 0); del df["tips"]''',
).output
features_in_csv = pandas_transform_csv_op(
table=training_data_in_csv,
transform_code='''df = df.drop(columns=["tips"])''',
).output
labels_in_csv = pandas_transform_csv_op(
table=training_data_in_csv,
transform_code='''df = df[["tips"]]''',
).output
if __name__ == '__main__':
kfp_endpoint=None
kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(pandas_transform_pipeline, arguments={})