examples/h-and-m-fash-rec-kaggle-com.../h&m-fash-rec-kfp.ipynb

614 lines
20 KiB
Plaintext

{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"# Kaggle Featured Prediction Competition: H&M Personalized Fashion Recommendations"
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"In this [competition](https://www.kaggle.com/competitions/h-and-m-personalized-fashion-recommendations), product recommendations have to be done based on previous purchases. There's a whole range of data available including customer meta data, product meta data, and meta data that spans from simple data, such as garment type and customer age, to text data from product descriptions, to image data from garment images.\n",
"\n",
"In this notebook we will be working with implicit's ALS library for our recommender systems. Please do check out the [docs](https://benfred.github.io/implicit/index.html) for more information"
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"Install the kfp package by uncommenting the below line and restarting the kernel. Do comment it out once the kernel is restarted"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Install the kfp \n",
"# !pip install kfp --upgrade "
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"Following are the imports required to build the pipeline and pass the data between components for building up the kubeflow pipeline"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"import kfp\n",
"from kfp.components import func_to_container_op\n",
"import kfp.components as comp\n",
"from typing import NamedTuple"
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"All the essential imports required in a pipeline component are put together in a list which then is passed on to each pipeline component. Though this might not be efficient when you are dealing with lot of packages, so in cases with many packages and dependencies you can go for docker image which then can be passed to each pipeline component"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"import_packages = ['pandas', 'sklearn', 'implicit', 'kaggle', 'numpy', 'pyarrow']"
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"In the following implementation of kubeflow pipeline we are making use of [lightweight python function components](https://www.kubeflow.org/docs/components/pipelines/sdk/python-function-components/) to build up the pipeline. The data is passed between component instances(tasks) using InputPath and OutputPath. Different ways of storing and passing data between the pipelines have been explored in the following notebook."
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"The pipeline is divided into five components\n",
"\n",
" 1. Download data from Kaggle\n",
" 2. Load and preprocess the data\n",
" 3. Creating sparse matrix\n",
" 4. Train model\n",
" 5. Predictions"
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"### Download the data from Kaggle"
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"Follow the prerequisites information in the Github README.md on how to create a secret for our credentials and mounting them to our pod using a pod-default resource. Once you have the secret mounted, you can use it to acccess the Username and key to download the files you need from kaggle. For the following competition, we have downloaded the files required instead of downloading the whole thing."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"def download_kaggle_dataset(path:str)->str:\n",
" \n",
" import os\n",
"\n",
" # Retrieve the credentials from the secret mounted and \n",
" # bring it onto our working environment\n",
" with open('/secret/kaggle/KAGGLE_KEY', 'r') as file:\n",
" kaggle_key = file.read().rstrip()\n",
" with open('/secret/kaggle/KAGGLE_USERNAME', 'r') as file:\n",
" kaggle_user = file.read().rstrip()\n",
" os.environ['KAGGLE_USERNAME'] = kaggle_user \n",
" os.environ['KAGGLE_KEY'] = kaggle_key\n",
"\n",
" os.chdir(os.getcwd())\n",
" os.system(\"mkdir \" + path)\n",
" os.chdir(path)\n",
" \n",
" # Using Kaggle Public API to download the datasets\n",
" import kaggle \n",
" from kaggle.api.kaggle_api_extended import KaggleApi\n",
" \n",
" api = KaggleApi()\n",
" api.authenticate()\n",
" \n",
" # Download the required files individually. You can also choose to download the entire dataset if you want to work with images as well. \n",
" api.competition_download_file('h-and-m-personalized-fashion-recommendations','customers.csv')\n",
" api.competition_download_file('h-and-m-personalized-fashion-recommendations','transactions_train.csv')\n",
" api.competition_download_file('h-and-m-personalized-fashion-recommendations','articles.csv')\n",
" api.competition_download_file('h-and-m-personalized-fashion-recommendations','sample_submission.csv') \n",
" \n",
" return path \n",
" \n"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"download_data_op = func_to_container_op(download_kaggle_dataset, packages_to_install = import_packages)"
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"### Load and Preprocess the data"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"def load_and_preprocess_data(path:str, preprocess_data_path: comp.OutputPath('ApacheParquet'))->NamedTuple('Outputs', [('data_path',str),('int_list', list)]):\n",
" \n",
" \n",
" import pandas as pd\n",
" import os\n",
" from zipfile import ZipFile \n",
" from pyarrow import parquet\n",
" import pyarrow as pa\n",
" \n",
" # Moving to current working directory and creating a new directory\n",
" os.chdir(os.getcwd())\n",
" print(os.listdir(path))\n",
" os.chdir(path)\n",
" \n",
" # Extracting all files from individual zip files\n",
" zipfile1 = ZipFile('customers.csv.zip', 'r')\n",
" zipfile1.extract(\"customers.csv\")\n",
" zipfile1.close()\n",
" \n",
" zipfile2 = ZipFile('transactions_train.csv.zip', 'r')\n",
" zipfile2.extract(\"transactions_train.csv\")\n",
" zipfile2.close()\n",
" \n",
" zipfile3 = ZipFile('articles.csv.zip', 'r')\n",
" zipfile3.extract(\"articles.csv\")\n",
" zipfile3.close()\n",
" \n",
" zipfile4 = ZipFile('sample_submission.csv.zip', 'r')\n",
" zipfile4.extract(\"sample_submission.csv\")\n",
" zipfile4.close()\n",
" \n",
" # Converting to pandas dataframe \n",
" customer_data = pd.read_csv(\"customers.csv\")\n",
" article_data = pd.read_csv(\"articles.csv\")\n",
" train_data = pd.read_csv(\"transactions_train.csv\") \n",
" \n",
" # create a new purchase count column that would gives us count of every article bought by the customers\n",
" X = train_data.groupby(['customer_id', 'article_id'])['article_id'].count().reset_index(name = \"purchase_count\") \n",
"\n",
" # Getting unique number of customers and articles using the customer and article metadata data files\n",
" unique_customers = customer_data['customer_id'].unique()\n",
" unique_articles = article_data['article_id'].unique()\n",
" \n",
" # length of the customers and articles\n",
" n_customers = len(unique_customers)\n",
" n_articles = len(unique_articles)\n",
"\n",
" # Create a mapping for customer_id to convert it from an object column to an int column for the sparse matrix creation\n",
" customer_id_dict = {unique_customers[i]:i for i in range(len(unique_customers))}\n",
" reverse_customer_id_dict = {i:unique_customers[i] for i in range(len(unique_customers))} \n",
" numeric_cus_id = []\n",
" for i in range(len(X['customer_id'])):\n",
" numeric_cus_id.append(customer_id_dict.get(X['customer_id'][i]))\n",
" X['customer_id'] = numeric_cus_id\n",
"\n",
" # Create a mapping for article_id so that the sparse matrix creation doesn't get large enough due to long int values of article_ids\n",
" article_id_dict = {unique_articles[i]:i for i in range(len(unique_articles))}\n",
" rev_art_id_dict = {i:int(unique_articles[i]) for i in range(len(unique_articles))}\n",
" numeric_art_id = []\n",
" for i in range(len(X['article_id'])):\n",
" numeric_art_id.append(article_id_dict.get(X['article_id'][i]))\n",
" X['article_id'] = numeric_art_id\n",
" \n",
" # Convert from pandas to Arrow\n",
" table = pa.Table.from_pandas(X)\n",
" parquet.write_table(table, preprocess_data_path)\n",
" \n",
" values=[n_customers, n_articles]\n",
" \n",
" return (path, values)\n",
" "
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"load_and_preprocess_data_op = func_to_container_op(load_and_preprocess_data,packages_to_install = import_packages)"
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"### Creating sparse matrix"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"def sparse_matrix_creation(data_path:str, list_val: list, file_path: comp.InputPath('ApacheParquet'), sparse_path: comp.OutputPath())->str:\n",
" \n",
" import pandas as pd\n",
" from pyarrow import parquet\n",
" import pyarrow as pa\n",
" import scipy.sparse as sparse\n",
" from scipy.sparse import coo_matrix\n",
" from pathlib import Path\n",
" import pickle\n",
" \n",
" X = parquet.read_pandas(file_path).to_pandas()\n",
" \n",
" n_customers = list_val[0]\n",
" n_articles = list_val[1]\n",
"\n",
" # Constructing sparse matrices for alternating least squares algorithm \n",
" sparse_user_item_coo = sparse.coo_matrix((X.purchase_count, (X.customer_id, X.article_id)), shape = (n_customers, n_articles))\n",
" sparse_user_item_csr = sparse.csr_matrix((X['purchase_count'], (X['customer_id'], X['article_id'])), shape = (n_customers, n_articles))\n",
"\n",
" pickle.dump(sparse_user_item_csr, open(sparse_path, 'wb'))\n",
" \n",
" return data_path "
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"sparse_matrix_creation_op = func_to_container_op(sparse_matrix_creation, packages_to_install = import_packages)"
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"### Train the Model"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"def train_model(path:str, sparse_matrix_path: comp.InputPath(), model_path: comp.OutputPath())->str:\n",
" \n",
" import implicit\n",
" import pandas as pd\n",
" from pyarrow import parquet\n",
" import pyarrow as pa\n",
" import scipy.sparse as sparse\n",
" import pickle\n",
" \n",
" # Loading the sparse user item matrix from pickle\n",
" sparse_user_item_csr = pickle.load(open(sparse_matrix_path, 'rb'))\n",
" \n",
" # parameters for the model\n",
" als_params = dict(\n",
" factors = 200, # number of latent factors - try between 50 to 1000\n",
" regularization = 0.01, # regularization factor - try between 0.001 to 0.2\n",
" iterations = 5, # iterations - try between 2 to 100\n",
" )\n",
"\n",
" # initialize a model\n",
" model = implicit.als.AlternatingLeastSquares(**als_params)\n",
"\n",
" # train the model on a sparse matrix of user/item/confidence weights \n",
" model.fit(sparse_user_item_csr)\n",
" \n",
" pickle.dump(model, open(model_path, 'wb'))\n",
" \n",
" return path"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"train_model_op = func_to_container_op(train_model, packages_to_install = import_packages)"
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"### Predictions"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"def predictions(test_path:str, model_path : comp.InputPath(), sparse_path: comp.InputPath()):\n",
" \n",
" import pandas as pd\n",
" import os\n",
" from zipfile import ZipFile \n",
" import pickle\n",
" from pyarrow import parquet\n",
" import pyarrow as pa\n",
" import scipy.sparse as sparse\n",
" \n",
" sparse_user_item_csr = pickle.load(open(sparse_path, 'rb'))\n",
" model = pickle.load(open(model_path, 'rb'))\n",
" \n",
" os.chdir(os.getcwd())\n",
" print(os.listdir(test_path))\n",
" os.chdir(test_path)\n",
"\n",
" # Converting to pandas dataframe \n",
" customer_data = pd.read_csv(\"customers.csv\")\n",
" article_data = pd.read_csv(\"articles.csv\") \n",
" test_data = pd.read_csv(\"sample_submission.csv\")\n",
" \n",
" # Getting unique number of customers and articles using the customer and article metadata data files\n",
" unique_customers = customer_data['customer_id'].unique()\n",
" unique_articles = article_data['article_id'].unique()\n",
" \n",
" # length of the customers and articles\n",
" n_customers = len(unique_customers)\n",
" n_articles = len(unique_articles)\n",
" \n",
" # Create a mapping for customer_id\n",
" customer_id_dict = {unique_customers[i]:i for i in range(len(unique_customers))}\n",
"\n",
" # Create a reverse mapping for article_id\n",
" reverse_article_id_dict = {i:int(unique_articles[i]) for i in range(len(unique_articles))}\n",
"\n",
" predictions=[]\n",
" count = 0\n",
" for cust_id in test_data.customer_id:\n",
" cust_id = customer_id_dict.get(cust_id)\n",
" if(cust_id!=None): \n",
" recommendations = model.recommend(cust_id, sparse_user_item_csr[cust_id],10)\n",
" result=[]\n",
" for i in range(len(recommendations[0])):\n",
" val = reverse_article_id_dict.get(recommendations[0][i])\n",
" result.append(val) \n",
" predictions.append(result)\n",
" \n",
" test_data['prediction'] = predictions\n",
" test_data"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"prediction_op = func_to_container_op(predictions, packages_to_install = import_packages)"
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"### Defining function that implements the pipeline"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [],
"source": [
"def kfp_pipeline():\n",
" \n",
" vop = kfp.dsl.VolumeOp(\n",
" name=\"create-volume\", \n",
" resource_name=\"mypvc\",\n",
" size=\"10Gi\",\n",
" modes = kfp.dsl.VOLUME_MODE_RWM\n",
" )\n",
" \n",
" download_task = download_data_op(\"/mnt/data/\").add_pvolumes({\"/mnt\": vop.volume}).add_pod_label(\"kaggle-secret\", \"true\")\n",
" load_and_preprocess_data_task = load_and_preprocess_data_op(download_task.output).add_pvolumes({\"/mnt\": vop.volume})\n",
" sparse_matrix_task = sparse_matrix_creation_op(data_path =load_and_preprocess_data_task.outputs['data_path'], \n",
" file = load_and_preprocess_data_task.outputs['preprocess_data'], \n",
" list_val = load_and_preprocess_data_task.outputs['int_list']).add_pvolumes({\"/mnt\": vop.volume})\n",
" train_model_task = train_model_op(path = sparse_matrix_task.outputs['Output'], \n",
" sparse_matrix = sparse_matrix_task.outputs['sparse']).add_pvolumes({\"/mnt\": vop.volume})\n",
" prediction_task = prediction_op(test_path = train_model_task.outputs['Output'],\n",
" model = train_model_task.outputs['model'], \n",
" sparse = sparse_matrix_task.outputs['sparse']).add_pvolumes({\"/mnt\": vop.volume})\n",
" \n",
" "
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<a href=\"/pipeline/#/experiments/details/e844a889-33e9-43b1-a374-b6ce676c17db\" target=\"_blank\" >Experiment details</a>."
],
"text/plain": [
"<IPython.core.display.HTML object>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
"<a href=\"/pipeline/#/runs/details/61cfeaf5-5caa-49f2-89c1-c3f3e9e8d5b5\" target=\"_blank\" >Run details</a>."
],
"text/plain": [
"<IPython.core.display.HTML object>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"RunPipelineResult(run_id=61cfeaf5-5caa-49f2-89c1-c3f3e9e8d5b5)"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Using kfp.Client() to run the pipeline from notebook itself\n",
"client = kfp.Client() # change arguments accordingly\n",
"\n",
"# Running the pipeline\n",
"client.create_run_from_pipeline_func(\n",
" kfp_pipeline,\n",
" arguments={\n",
" })"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"kubeflow_notebook": {
"autosnapshot": true,
"experiment": {
"id": "",
"name": ""
},
"experiment_name": "",
"katib_metadata": {
"algorithm": {
"algorithmName": "grid"
},
"maxFailedTrialCount": 3,
"maxTrialCount": 12,
"objective": {
"objectiveMetricName": "",
"type": "minimize"
},
"parallelTrialCount": 3,
"parameters": []
},
"katib_run": false,
"pipeline_description": "",
"pipeline_name": "",
"snapshot_volumes": true,
"steps_defaults": [
"label:access-ml-pipeline:true",
"label:access-rok:true"
],
"volume_access_mode": "rwm",
"volumes": [
{
"annotations": [],
"mount_point": "/home/jovyan",
"name": "hm-fash-workspace-fhh9d",
"size": 50,
"size_type": "Gi",
"snapshot": false,
"type": "clone"
}
]
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.9"
}
},
"nbformat": 4,
"nbformat_minor": 4
}