examples/store-sales-forecasting-kag.../store-sales-kfp.ipynb

635 lines
21 KiB
Plaintext

{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"# Kaggle Getting Started Prediction Competition: Store Sales - Time Series Forecasting"
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"In this [competition](https://www.kaggle.com/competitions/store-sales-time-series-forecasting), we will use time-series forecasting to forecast store sales on data from Corporación Favorita, a large Ecuadorian-based grocery retailer. The notebook is a buildup of hands-on-exercises presented in Kaggle Learn course of [Time Series Course](https://www.kaggle.com/learn/time-series) where you will learn to leverage periodic trends for forecasting as well as combine different models such as linear regression and XGBoost to perfect your forecasting. For the purpose of this tutorial we are looking at periodic trend for forecasting."
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"Install the kfp package by uncommenting the below line and restarting the kernel. Do comment it out once the kernel is restarted"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"# Install the kfp \n",
"# !pip install kfp --upgrade "
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"## Imports"
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"Following are the imports required to build the pipeline and pass the data between components for building up the kubeflow pipeline"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"import kfp\n",
"from kfp.components import func_to_container_op\n",
"import kfp.components as comp\n",
"# from typing import NamedTuple"
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"All the essential imports required in a pipeline component are put together in a list which then is passed on to each pipeline component. Though this might not be efficient when you are dealing with lot of packages, so in cases with many packages and dependencies you can go for docker image which then can be passed to each pipeline component"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"import_packages = ['pandas', 'sklearn', 'statsmodels', 'kaggle','pyarrow','scikit-learn']"
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"In the following implementation of kubeflow pipeline we are making use of [lightweight python function components](https://www.kubeflow.org/docs/components/pipelines/sdk/python-function-components/) to build up the pipeline. The data is passed between component instances(tasks) using InputPath and OutputPath. Different ways of storing and passing data between the pipelines have been explored in the following notebook."
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"The pipeline is divided into five components\n",
"\n",
" 1. Download the data from Kaggle\n",
" 2. Load the data\n",
" 3. Create features\n",
" 4. Train and evaluate the model\n",
" 5. Forecast Sales"
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"## Download data from Kaggle"
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"Follow the prerequisites information in the Github README.md on how to create a secret for our credentials and mounting them to our pod using a pod-default resource. Once you have the secret mounted, you can use it to acccess the Username and key to download the files you need from kaggle. "
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"def download_kaggle_dataset(path:str)->str:\n",
" \n",
" import os\n",
"\n",
" # Retrieve the credentials from the secret mounted and \n",
" # bring it onto our working environment\n",
" with open('/secret/kaggle/KAGGLE_KEY', 'r') as file:\n",
" kaggle_key = file.read().rstrip()\n",
" with open('/secret/kaggle/KAGGLE_USERNAME', 'r') as file:\n",
" kaggle_user = file.read().rstrip()\n",
" os.environ['KAGGLE_USERNAME'] = kaggle_user \n",
" os.environ['KAGGLE_KEY'] = kaggle_key\n",
"\n",
" os.chdir(os.getcwd())\n",
" os.system(\"mkdir \" + path)\n",
" os.chdir(path)\n",
" \n",
" # Using Kaggle Public API to download the datasets\n",
" import kaggle \n",
" from kaggle.api.kaggle_api_extended import KaggleApi\n",
" \n",
" api = KaggleApi()\n",
" api.authenticate()\n",
" \n",
" # Download the required files individually. You can also choose to download the entire dataset if you want to work with images as well. \n",
" api.competition_download_files('store-sales-time-series-forecasting') \n",
" \n",
" return path "
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"download_data_op = func_to_container_op(download_kaggle_dataset, packages_to_install = import_packages)"
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"## Load the data"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"def load_data(path:str, train_data_path: comp.OutputPath(), holidays_data_path: comp.OutputPath())->str:\n",
" \n",
" # Imports\n",
" import pandas as pd\n",
" import os\n",
" from zipfile import ZipFile \n",
" from pyarrow import parquet\n",
" import pickle\n",
" \n",
" # Moving to current working directory and creating a new directory\n",
" os.chdir(os.getcwd())\n",
" print(os.listdir(path))\n",
" os.chdir(path)\n",
" \n",
" # Extracting all files from competition zip file\n",
" zipfile = ZipFile('store-sales-time-series-forecasting.zip', 'r')\n",
" zipfile.extractall()\n",
" zipfile.close()\n",
" \n",
" # Converting to pandas dataframe \n",
" train_data_filepath = path + \"/train.csv\"\n",
" test_data_filepath = path + \"/test.csv\"\n",
" holidays_filepath = path + \"/holidays_events.csv\"\n",
"\n",
" # Read the csv files into dataframes\n",
" # Training data\n",
" train_sales = pd.read_csv(train_data_filepath,\n",
" usecols=['store_nbr', 'family', 'date', 'sales'],\n",
" dtype={\n",
" 'store_nbr': 'category',\n",
" 'family': 'category',\n",
" 'sales': 'float32',\n",
" },\n",
" parse_dates=['date'],\n",
" infer_datetime_format=True,\n",
" )\n",
" train_sales['date'] = train_sales.date.dt.to_period('D')\n",
" train_sales = train_sales.set_index(['store_nbr', 'family', 'date']).sort_index()\n",
"\n",
" # Holiday features dataset\n",
" holidays_events = pd.read_csv(\n",
" holidays_filepath,\n",
" dtype={\n",
" 'type': 'category',\n",
" 'locale': 'category',\n",
" 'locale_name': 'category',\n",
" 'description': 'category',\n",
" 'transferred': 'bool',\n",
" },\n",
" parse_dates=['date'],\n",
" infer_datetime_format=True,\n",
" )\n",
" holidays_events = holidays_events.set_index('date').to_period('D')\n",
"\n",
" # Test data id required for submission of forecast sales\n",
" df_test = pd.read_csv(\n",
" test_data_filepath,\n",
" dtype={\n",
" 'store_nbr': 'category',\n",
" 'family': 'category',\n",
" 'onpromotion': 'uint32',\n",
" },\n",
" parse_dates=['date'],\n",
" infer_datetime_format=True,\n",
" )\n",
" df_test['date'] = df_test.date.dt.to_period('D')\n",
" df_test = df_test.set_index(['store_nbr', 'family', 'date']).sort_index()\n",
" \n",
" # Dumping files to pickle\n",
" pickle.dump(train_sales, open(train_data_path, 'wb'))\n",
" pickle.dump(holidays_events, open(holidays_data_path, 'wb'))\n",
" \n",
" return path\n",
" "
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"load_data_op = func_to_container_op(load_data,packages_to_install = import_packages)"
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"## Create features\n",
"\n",
"1. indicators for weekly seasons\n",
"2. Fourier features of order 4 for monthly seasons\n",
"3. Creating holiday features provided in the Store Sales Dataset"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"def create_features(path:str, train_data_path: comp.InputPath(), holidays_data_path: comp.InputPath(), \n",
" holidays_feat_path:comp.OutputPath(), data_path: comp.OutputPath('ApacheParquet'), \n",
" target_data_path: comp.OutputPath(), dp_feat_path: comp.OutputPath())->str:\n",
" \n",
" # Imports\n",
" import pandas as pd\n",
" from pyarrow import parquet\n",
" import pyarrow as pa\n",
" from statsmodels.tsa.deterministic import CalendarFourier, DeterministicProcess\n",
" from sklearn.preprocessing import OneHotEncoder\n",
" import pickle\n",
" \n",
" train_sales = pickle.load(open(train_data_path, 'rb'))\n",
" holidays_events = pickle.load(open(holidays_data_path, 'rb'))\n",
" \n",
" # National and regional holidays of Ecuador in the training set\n",
" # Holiday features\n",
" holidays = (\n",
" holidays_events\n",
" .query(\"locale in ['National', 'Regional']\")\n",
" .loc['2017':'2017-08-15', ['description']]\n",
" .assign(description=lambda x: x.description.cat.remove_unused_categories())\n",
" )\n",
" \n",
" # Create training data features\n",
" y = train_sales.unstack(['store_nbr', 'family']).loc[\"2017\"]\n",
"\n",
" # Using CalendarFourier to create fourier features \n",
" fourier = CalendarFourier(freq='M', order=4)\n",
"\n",
" # Using DeterministicProcess to create indicators for both \n",
" # weekly and monthly seasons\n",
" dp = DeterministicProcess(\n",
" index=y.index,\n",
" constant=True,\n",
" order=1,\n",
" seasonal=True, # weekly seasonality (indicators)\n",
" additional_terms=[fourier], # annual seasonality (fourier)\n",
" drop=True,\n",
" )\n",
"\n",
" # `in_sample` creates features for the dates given in the `index` argument\n",
" X = dp.in_sample()\n",
"\n",
" ohe = OneHotEncoder(sparse=False)\n",
"\n",
" X_holidays = pd.DataFrame(\n",
" ohe.fit_transform(holidays),\n",
" index=holidays.index,\n",
" columns=holidays.description.unique(),\n",
" )\n",
"\n",
" X_holidays = pd.get_dummies(holidays)\n",
"\n",
" # Join holiday features to training data\n",
" X_2 = X.join(X_holidays, on='date').fillna(0.0)\n",
" \n",
" table = pa.Table.from_pandas(X_2)\n",
" parquet.write_table(table, data_path)\n",
" \n",
" pickle.dump(X_holidays, open(holidays_feat_path, 'wb'))\n",
" pickle.dump(y, open(target_data_path, 'wb'))\n",
" pickle.dump(dp, open(dp_feat_path, 'wb'))\n",
" \n",
" return path"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"create_features_op = func_to_container_op(create_features,packages_to_install = import_packages)"
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"## Train and evaluate the model"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"def train_and_evaluate_model(path:str, data_path: comp.InputPath('ApacheParquet'), target_data_path: comp.InputPath(), \n",
" model_path: comp.OutputPath())->str:\n",
" \n",
" import pandas as pd\n",
" from sklearn.linear_model import LinearRegression\n",
" from sklearn.model_selection import train_test_split\n",
" from sklearn.metrics import mean_absolute_error\n",
" from pyarrow import parquet\n",
" import pyarrow as pa\n",
" import pickle\n",
" \n",
" X_2 = parquet.read_pandas(data_path).to_pandas()\n",
" y = pickle.load(open(target_data_path, 'rb'))\n",
" \n",
" # Split the data to train and valid datasets\n",
" X_train, X_valid, y_train, y_valid = train_test_split(X_2, y, test_size=0.1, shuffle=False)\n",
"\n",
" # Train the model\n",
" model = LinearRegression(fit_intercept=False)\n",
" model.fit(X_train, y_train)\n",
"\n",
" # Get the training and valid data predictions\n",
" y_train_pred = pd.DataFrame(model.predict(X_train), index=X_train.index, columns=y.columns)\n",
" y_valid_pred = pd.DataFrame(model.predict(X_valid), index=X_valid.index, columns=y.columns)\n",
" # Evaluate the model using mean_squared_log_error\n",
" print(mean_absolute_error(y_valid, y_valid_pred))\n",
" \n",
" pickle.dump(model, open(model_path, 'wb'))\n",
" \n",
" return path"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"train_and_evaluate_model_op = func_to_container_op(train_and_evaluate_model,packages_to_install = import_packages)"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"def forecast_sales(path:str, model_path: comp.InputPath(), holidays_feat_path: comp.InputPath(), \n",
" target_data_path: comp.InputPath(), dp_feat_path: comp.InputPath()):\n",
" \n",
" import pandas as pd\n",
" from statsmodels.tsa.deterministic import CalendarFourier, DeterministicProcess\n",
" import pickle\n",
" \n",
" model = pickle.load(open(model_path, 'rb'))\n",
" dp = pickle.load(open(dp_feat_path, 'rb'))\n",
" \n",
" X_holidays = pickle.load(open(holidays_feat_path, 'rb'))\n",
" y = pickle.load(open(target_data_path, 'rb'))\n",
" \n",
" # Create features for test set\n",
" # \"out of sample\" refers to times outside of the observation period of the training data.\n",
" # We are forecasting for next 16 days from the end of the training data date\n",
" test = dp.out_of_sample(steps=16)\n",
" test.index.name = 'date'\n",
" X_test = test.join(X_holidays, on='date').fillna(0.0)\n",
" y_forecast = pd.DataFrame(model.predict(X_test), index=X_test.index, columns=y.columns)\n",
" print(y_forecast)\n"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"forecast_sales_op = func_to_container_op(forecast_sales, packages_to_install = import_packages)"
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"## Defining function that implements the pipeline"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [],
"source": [
"def kfp_pipeline():\n",
" \n",
" vop = kfp.dsl.VolumeOp(\n",
" name=\"create-volume\", \n",
" resource_name=\"store-sales-pvc\",\n",
" size=\"5Gi\",\n",
" modes = kfp.dsl.VOLUME_MODE_RWM\n",
" )\n",
" \n",
" download_task = download_data_op(\"/mnt/data/\").add_pvolumes({\"/mnt\": vop.volume}).add_pod_label(\"kaggle-secret\", \"true\")\n",
" load_data_task = load_data_op(download_task.output).add_pvolumes({\"/mnt\": vop.volume})\n",
" create_features_task = create_features_op(path = load_data_task.outputs['Output'],\n",
" train_data =load_data_task.outputs['train_data'], \n",
" holidays_data = load_data_task.outputs['holidays_data']\n",
" ).add_pvolumes({\"/mnt\": vop.volume})\n",
" train_and_evaluate_model_task = train_and_evaluate_model_op(path = create_features_task.outputs['Output'], \n",
" data = create_features_task.outputs['data'],\n",
" target_data = create_features_task.outputs['target_data'], \n",
" ).add_pvolumes({\"/mnt\": vop.volume})\n",
" forecast_sales_task = forecast_sales_op(path = train_and_evaluate_model_task.outputs['Output'],\n",
" model = train_and_evaluate_model_task.outputs['model'], \n",
" holidays_feat = create_features_task.outputs['holidays_feat'],\n",
" target_data = create_features_task.outputs['target_data'],\n",
" dp_feat = create_features_task.outputs['dp_feat'] \n",
" ).add_pvolumes({\"/mnt\": vop.volume})"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'train_data': {{pipelineparam:op=load-data;name=train_data}}, 'holidays_data': {{pipelineparam:op=load-data;name=holidays_data}}, 'Output': {{pipelineparam:op=load-data;name=Output}}, 'output': {{pipelineparam:op=load-data;name=Output}}}\n"
]
},
{
"data": {
"text/html": [
"<a href=\"/pipeline/#/experiments/details/6df864a5-13ed-447b-96d6-1bce286c6597\" target=\"_blank\" >Experiment details</a>."
],
"text/plain": [
"<IPython.core.display.HTML object>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
"<a href=\"/pipeline/#/runs/details/db0cbcb3-9e78-4542-b4d5-7e061e170af7\" target=\"_blank\" >Run details</a>."
],
"text/plain": [
"<IPython.core.display.HTML object>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"RunPipelineResult(run_id=db0cbcb3-9e78-4542-b4d5-7e061e170af7)"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Using kfp.Client() to run the pipeline from notebook itself\n",
"client = kfp.Client() # change arguments accordingly\n",
"\n",
"# Running the pipeline\n",
"client.create_run_from_pipeline_func(\n",
" kfp_pipeline,\n",
" arguments={\n",
" })"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"kubeflow_notebook": {
"autosnapshot": true,
"experiment": {
"id": "8b9ab23a-f26c-475d-baa7-1b4e6e26c816",
"name": "store-sales-forecast"
},
"experiment_name": "store-sales-forecast",
"katib_metadata": {
"algorithm": {
"algorithmName": "grid"
},
"maxFailedTrialCount": 3,
"maxTrialCount": 12,
"objective": {
"objectiveMetricName": "",
"type": "minimize"
},
"parallelTrialCount": 3,
"parameters": []
},
"katib_run": false,
"pipeline_description": "",
"pipeline_name": "",
"snapshot_volumes": true,
"steps_defaults": [
"label:access-ml-pipeline:true",
"label:access-rok:true"
],
"volume_access_mode": "rwm",
"volumes": [
{
"annotations": [],
"mount_point": "/home/jovyan",
"name": "store-sales-workspace-tx8b7",
"size": 20,
"size_type": "Gi",
"snapshot": false,
"type": "clone"
}
]
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.9"
}
},
"nbformat": 4,
"nbformat_minor": 4
}