59 lines
2.9 KiB
Python
59 lines
2.9 KiB
Python
import kfp
|
|
from kfp import components
|
|
|
|
|
|
chicago_taxi_dataset_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/e3337b8bdcd63636934954e592d4b32c95b49129/components/datasets/Chicago%20Taxi/component.yaml')
|
|
convert_csv_to_apache_parquet_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/d737c448723b9f541a3543012b4414c17b2eab5c/components/_converters/ApacheParquet/from_CSV/component.yaml')
|
|
convert_apache_parquet_to_csv_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/af3eaf64e87313795cad1add9bfd9fa1e86af6de/components/_converters/ApacheParquet/to_CSV/component.yaml')
|
|
|
|
pandas_transform_parquet_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/6162d55998b176b50267d351241100bb0ee715bc/components/pandas/Transform_DataFrame/in_ApacheParquet_format/component.yaml')
|
|
pandas_transform_csv_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/6162d55998b176b50267d351241100bb0ee715bc/components/pandas/Transform_DataFrame/in_CSV_format/component.yaml')
|
|
|
|
|
|
def pandas_transform_pipeline():
|
|
training_data_in_csv = chicago_taxi_dataset_op(
|
|
where='trip_start_timestamp >= "2019-01-01" AND trip_start_timestamp < "2019-02-01"',
|
|
select='tips,trip_seconds,trip_miles,pickup_community_area,dropoff_community_area,fare,tolls,extras,trip_total',
|
|
limit=1000,
|
|
).output
|
|
|
|
training_data_in_parquet = convert_csv_to_apache_parquet_op(training_data_in_csv).output
|
|
|
|
training_data_for_classification_in_parquet = pandas_transform_parquet_op(
|
|
table=training_data_in_parquet,
|
|
transform_code='''df.insert(0, "was_tipped", df["tips"] > 0); del df["tips"]''',
|
|
).output
|
|
convert_apache_parquet_to_csv_op(training_data_for_classification_in_parquet)
|
|
|
|
features_in_parquet = pandas_transform_parquet_op(
|
|
table=training_data_in_parquet,
|
|
transform_code='''df = df.drop(columns=["tips"])''',
|
|
).output
|
|
convert_apache_parquet_to_csv_op(features_in_parquet)
|
|
|
|
labels_in_parquet = pandas_transform_parquet_op(
|
|
table=training_data_in_parquet,
|
|
transform_code='''df = df[["tips"]]''',
|
|
).output
|
|
convert_apache_parquet_to_csv_op(labels_in_parquet)
|
|
|
|
training_data_for_classification_in_csv = pandas_transform_csv_op(
|
|
table=training_data_in_csv,
|
|
transform_code='''df.insert(0, "was_tipped", df["tips"] > 0); del df["tips"]''',
|
|
).output
|
|
|
|
features_in_csv = pandas_transform_csv_op(
|
|
table=training_data_in_csv,
|
|
transform_code='''df = df.drop(columns=["tips"])''',
|
|
).output
|
|
|
|
labels_in_csv = pandas_transform_csv_op(
|
|
table=training_data_in_csv,
|
|
transform_code='''df = df[["tips"]]''',
|
|
).output
|
|
|
|
|
|
if __name__ == '__main__':
|
|
kfp_endpoint=None
|
|
kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(pandas_transform_pipeline, arguments={})
|