examples/jpx-tokyo-stock-exchange-ka.../jpx-tokyo-stock-exchange-pr...

696 lines
23 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"# JPX Tokyo Stock Exchange Kubeflow Pipeline\n",
"\n",
"In this [Kaggle competition](https://www.kaggle.com/competitions/jpx-tokyo-stock-exchange-prediction/overview) \n",
"\n",
">Japan Exchange Group, Inc. (JPX) is a holding company operating one of the largest stock exchanges in the world, Tokyo Stock Exchange (TSE), and derivatives exchanges Osaka Exchange (OSE) and Tokyo Commodity Exchange (TOCOM). JPX is hosting this competition and is supported by AI technology company AlpacaJapan Co.,Ltd.\n",
"\n",
"> In this competition, you will model real future returns of around 2,000 stocks. The competition will involve building portfolios from the stocks eligible for predictions. The stocks are ranked from highest to lowest expected returns and they are evaluated on the difference in returns between the top and bottom 200 stocks."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Install relevant libraries\n",
"\n",
"\n",
">Update pip `pip install --user --upgrade pip`\n",
"\n",
">Install and upgrade kubeflow sdk `pip install kfp --upgrade --user --quiet`\n",
"\n",
"You may need to restart your notebook kernel after installing the kfp sdk"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Requirement already satisfied: pip in /usr/local/lib/python3.6/dist-packages (21.3.1)\n"
]
}
],
"source": [
"!pip install --user --upgrade pip"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"!pip install kfp --upgrade --user --quiet"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Name: kfp\n",
"Version: 1.8.11\n",
"Summary: KubeFlow Pipelines SDK\n",
"Home-page: https://github.com/kubeflow/pipelines\n",
"Author: The Kubeflow Authors\n",
"Author-email: \n",
"License: UNKNOWN\n",
"Location: /home/jovyan/.local/lib/python3.6/site-packages\n",
"Requires: absl-py, click, cloudpickle, dataclasses, Deprecated, docstring-parser, fire, google-api-python-client, google-auth, google-cloud-storage, jsonschema, kfp-pipeline-spec, kfp-server-api, kubernetes, protobuf, pydantic, PyYAML, requests-toolbelt, strip-hints, tabulate, typer, typing-extensions, uritemplate\n",
"Required-by: kubeflow-kale\n"
]
}
],
"source": [
"# confirm the kfp sdk\n",
"! pip show kfp"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"tags": [
"imports"
]
},
"outputs": [],
"source": [
"import kfp\n",
"import kfp.components as comp\n",
"import kfp.dsl as dsl\n",
"from kfp.components import InputPath, OutputPath\n",
"from typing import NamedTuple"
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"# Kubeflow pipeline component creation\n",
"\n",
"## Download and load the dataset"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"# load data step\n",
"def load_data(dataset: str, data_path: OutputPath(str)):\n",
" \n",
" # install the necessary libraries\n",
" import os, sys, subprocess, zipfile, pickle;\n",
" subprocess.run([\"python\", \"-m\", \"pip\", \"install\", \"--upgrade\", \"pip\"])\n",
" subprocess.run([sys.executable, '-m', 'pip', 'install','pandas'])\n",
" subprocess.run([sys.executable, '-m', 'pip', 'install','kaggle'])\n",
" \n",
" # import libraries\n",
" import pandas as pd\n",
"\n",
" # setup kaggle environment for data download\n",
" with open('/secret/kaggle-secret/password', 'r') as file:\n",
" kaggle_key = file.read().rstrip()\n",
" with open('/secret/kaggle-secret/username', 'r') as file:\n",
" kaggle_user = file.read().rstrip()\n",
" \n",
" os.environ['KAGGLE_USERNAME'], os.environ['KAGGLE_KEY'] = kaggle_user, kaggle_key\n",
" \n",
" # create data_path directory\n",
" if not os.path.exists(data_path):\n",
" os.makedirs(data_path)\n",
" \n",
" # download kaggle's jpx-tokyo-stock-exchange-prediction data\n",
" subprocess.run([\"kaggle\",\"competitions\", \"download\", \"-c\", dataset])\n",
" \n",
" # extract jpx-tokyo-stock-exchange-prediction.zip to data_path\n",
" with zipfile.ZipFile(f\"{dataset}.zip\",\"r\") as zip_ref:\n",
" zip_ref.extractall(data_path)\n",
" \n",
" # read train_files/stock_prices.csv\n",
" df_prices = pd.read_csv(f\"{data_path}/train_files/stock_prices.csv\", parse_dates=['Date'])\n",
" \n",
" # Save the loaded data as a pickle file to be used by the tranform_data component.\n",
" with open(f'{data_path}/df_prices', 'wb') as f:\n",
" pickle.dump(df_prices, f)\n",
"\n",
" \n",
" return(print('Done!'))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Transform data"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"# transform data step\n",
"\n",
"def transform_data(data_path: InputPath(str), \n",
" transform_data_path: OutputPath(str)):\n",
" \n",
" # install the necessary libraries\n",
" import sys, subprocess;\n",
" subprocess.run([\"python\", \"-m\", \"pip\", \"install\", \"--upgrade\", \"pip\"])\n",
" subprocess.run([sys.executable, '-m', 'pip', 'install','pandas'])\n",
" subprocess.run([sys.executable, '-m', 'pip', 'install','scipy'])\n",
" \n",
" # import Libraries\n",
" import os, pickle;\n",
" import pandas as pd\n",
" import numpy as np\n",
" from scipy import stats\n",
" \n",
" # load the df_prices data from load_data_path\n",
" with open(f'{data_path}/df_prices', 'rb') as f:\n",
" df_prices = pickle.load(f)\n",
"\n",
" # sort data by 'Date' and 'SecuritiesCode'\n",
" df_prices.sort_values(by=['Date','SecuritiesCode'], inplace=True)\n",
"\n",
" # filter out data with less than 2000 stock counts in a day\n",
" # dates before 2020-12-23 all have stock counts less than 2000\n",
" # This is done to work with consistent data \n",
" df_prices = df_prices[(df_prices[\"Date\"]>=\"2020-12-23\")]\n",
"\n",
" df_prices = df_prices.reset_index(drop=True)\n",
" \n",
" # calculate z-scores of `df`for outlier removal\n",
" z_scores = stats.zscore(df_prices[['Open', 'High', 'Low', 'Close','Volume']], nan_policy='omit')\n",
" abs_z_scores = np.abs(z_scores)\n",
" filtered_entries = (abs_z_scores < 3).all(axis=1)\n",
" df_zscore = df_prices[filtered_entries]\n",
" df_zscore = df_zscore.reset_index(drop=True)\n",
" \n",
" #creating the transform_data_path\n",
" os.makedirs(transform_data_path, exist_ok = True)\n",
" \n",
" #Save the df_zscore data as a pickle file to be used by the feature_engineering component.\n",
" with open(f'{transform_data_path}/df_zscore', 'wb') as f:\n",
" pickle.dump(df_zscore, f)\n",
" \n",
" return(print('Done!'))"
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"## Feature Engineering"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"# feature engineering step\n",
"\n",
"def feature_engineering(transform_data_path: InputPath(str), \n",
" feat_eng_path: OutputPath(str)):\n",
" \n",
" # install the necessary libraries\n",
" import sys, subprocess;\n",
" subprocess.run([\"python\", \"-m\", \"pip\", \"install\", \"--upgrade\", \"pip\"])\n",
" subprocess.run([sys.executable, '-m', 'pip', 'install','pandas'])\n",
" subprocess.run([sys.executable, '-m', 'pip', 'install','tqdm'])\n",
" \n",
" # import Library\n",
" import os, pickle;\n",
" import numpy as np\n",
" import pandas as pd\n",
" from tqdm import tqdm\n",
"\n",
" # loading the df_zscore data\n",
" with open(f'{transform_data_path}/df_zscore', 'rb') as f:\n",
" df_zscore = pickle.load(f)\n",
" \n",
" def feat_eng(df, features):\n",
"\n",
" for i in tqdm(range(1, 6)):\n",
" # creating lag features\n",
" tmp = df[features].shift(i)\n",
" tmp.columns = [c + f'_next_shift_{i}' for c in tmp.columns]\n",
" df = pd.concat([df, tmp], sort=False, axis=1)\n",
"\n",
" for i in tqdm(range(1, 6)):\n",
" df[f'weighted_vol_price_{i}'] = np.log(df[f'Volume_next_shift_{i}'] * df[[col for col in df if col.endswith(f'next_shift_{i}')][:-1]].apply(np.mean, axis=1) + 1.0)\n",
"\n",
" # feature engineering\n",
" df['weighted_vol_price'] = np.log(df['Volume'] * (np.mean(df[features[:-1]], axis=1)) + 1.0)\n",
" df['BOP'] = (df['Open']-df['Close'])/(df['High']-df['Low'])\n",
" df['HL'] = df['High'] - df['Low']\n",
" df['OC'] = df['Close'] - df['Open']\n",
" df['OHLCstd'] = df[['Open','Close','High','Low']].std(axis=1)\n",
" \n",
" # replace inf with nan\n",
" df.replace([np.inf, -np.inf], np.nan, inplace=True)\n",
" \n",
" # datetime features\n",
" df['Date'] = pd.to_datetime(df['Date'])\n",
" df['Day'] = df['Date'].dt.weekday.astype(np.int32)\n",
" df[\"dayofyear\"] = df['Date'].dt.dayofyear\n",
" df[\"is_weekend\"] = df['Day'].isin([5, 6])\n",
" df[\"weekofyear\"] = df['Date'].dt.weekofyear\n",
" df[\"month\"] = df['Date'].dt.month\n",
" df[\"season\"] = (df[\"month\"]%12 + 3)//3\n",
" \n",
" # fill nan values\n",
" df = df.fillna(0)\n",
" return df\n",
" \n",
" new_feats = feat_eng(df_zscore, ['High', 'Low', 'Open', 'Close', 'Volume'])\n",
" new_feats['Target'] = df_zscore['Target']\n",
" \n",
" # creating the feat_eng_path\n",
" os.makedirs(feat_eng_path, exist_ok = True)\n",
" \n",
" # save the feature engineered data as a pickle file to be used by the modeling component.\n",
" with open(f'{feat_eng_path}/new_feats', 'wb') as f:\n",
" pickle.dump(new_feats, f)\n",
" \n",
" return(print('Done!')) "
]
},
{
"cell_type": "markdown",
"metadata": {
"papermill": {
"duration": 0.01421,
"end_time": "2022-04-17T07:17:13.396620",
"exception": false,
"start_time": "2022-04-17T07:17:13.382410",
"status": "completed"
},
"tags": []
},
"source": [
"## Modelling\n",
" "
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"# modeling step\n",
"\n",
"def modeling(feat_eng_path: InputPath(str), \n",
" model_path: OutputPath(str)):\n",
" \n",
" # install the necessary libraries\n",
" import sys, subprocess;\n",
" subprocess.run([\"python\", \"-m\", \"pip\", \"install\", \"--upgrade\", \"pip\"])\n",
" subprocess.run([sys.executable, '-m', 'pip', 'install','pandas'])\n",
" subprocess.run([sys.executable, '-m', 'pip', 'install','lightgbm'])\n",
" \n",
" # import Library\n",
" import os, pickle, joblib;\n",
" from lightgbm import LGBMRegressor\n",
"\n",
" #loading the new_feats data\n",
" with open(f'{feat_eng_path}/new_feats', 'rb') as f:\n",
" new_feats = pickle.load(f)\n",
" \n",
" # columns to be used for modelling.\n",
" feats = ['Date','SecuritiesCode', 'Open', 'High', 'Low', 'Close', 'Volume',\n",
" 'weighted_vol_price_1', 'weighted_vol_price_2', 'weighted_vol_price_3',\n",
" 'weighted_vol_price', 'BOP', 'HL', 'OC', 'OHLCstd', 'Day', 'dayofyear',\n",
" 'is_weekend', 'weekofyear', 'month', 'season']\n",
" \n",
" # transform date to int\n",
" new_feats['Date'] = new_feats['Date'].dt.strftime(\"%Y%m%d\").astype(int)\n",
" \n",
" # split data into valid for validation and train for model training\n",
" valid = new_feats[(new_feats['Date'] >= 20211111)].copy()\n",
" train = new_feats[(new_feats['Date'] < 20211111)].copy()\n",
"\n",
" #creating the model_path directory\n",
" os.makedirs(model_path, exist_ok = True)\n",
" \n",
" # model parameter\n",
" params = {\n",
" 'n_estimators': 100,\n",
" 'verbose' : 2,\n",
" 'random_state': 1,\n",
" 'learning_rate': 0.379687157316759}\n",
" \n",
" # model initialization\n",
" model = LGBMRegressor(**params)\n",
"\n",
"\n",
" X = train[feats]\n",
" y = train[\"Target\"]\n",
"\n",
" X_test = valid[feats]\n",
" y_test = valid[\"Target\"]\n",
" \n",
" # fitting\n",
" model.fit(X, y, verbose=False, eval_set=(X_test, y_test))\n",
" \n",
" # saving model\n",
" joblib.dump(model, f'{model_path}/model')\n",
"\n",
" #Save the test_data as a pickle file to be used by the predict component.\n",
" with open(f'{model_path}/test', 'wb') as f:\n",
" pickle.dump((X_test, y_test), f)\n",
" \n",
" return(print('Done!')) "
]
},
{
"cell_type": "markdown",
"metadata": {
"papermill": {
"duration": 0.01428,
"end_time": "2022-04-17T07:17:23.959655",
"exception": false,
"start_time": "2022-04-17T07:17:23.945375",
"status": "completed"
},
"tags": []
},
"source": [
"## Evaluation and Prediction"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"def prediction(model_path: InputPath(str), \n",
" metrics_path: OutputPath(str)) -> NamedTuple(\"EvaluationOutput\", [(\"mlpipeline_metrics\", \"Metrics\")]):\n",
" \n",
" # import Library\n",
" import sys, subprocess;\n",
" subprocess.run([\"python\", \"-m\", \"pip\", \"install\", \"--upgrade\", \"pip\"])\n",
" subprocess.run([sys.executable, '-m', 'pip', 'install','scikit-learn'])\n",
" subprocess.run([sys.executable, '-m', 'pip', 'install','pandas'])\n",
" subprocess.run([sys.executable, '-m', 'pip', 'install','lightgbm'])\n",
" import pickle, json;\n",
" import pandas as pd\n",
" import numpy as np\n",
" from collections import namedtuple\n",
" import joblib\n",
" from sklearn.metrics import mean_squared_error\n",
" from lightgbm import LGBMRegressor\n",
"\n",
" \n",
" \n",
" # load test_data\n",
" with open(f'{model_path}/test', 'rb') as f:\n",
" X_test, y_test = pickle.load(f)\n",
" \n",
" # load model\n",
" model = joblib.load(f'{model_path}/model')\n",
" \n",
" # model prediction\n",
" preds = model.predict(X_test)\n",
" \n",
" # model evaluation\n",
" rmse = np.round(mean_squared_error(preds, y_test)**0.5, 5)\n",
" \n",
" # create kubeflow metric metadata for UI \n",
" metrics = {\n",
" 'metrics': [\n",
" {'name': 'root-mean-squared-error',\n",
" 'numberValue': rmse,\n",
" 'format': 'RAW'},\n",
" ]\n",
" }\n",
" \n",
"\n",
" with open(metrics_path, \"w\") as f:\n",
" json.dump(metrics, f)\n",
"\n",
" output_tuple = namedtuple(\"EvaluationOutput\", [\"mlpipeline_metrics\"])\n",
"\n",
" return output_tuple(json.dumps(metrics))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create pipeline components \n",
"\n",
"using `create_component_from_func`"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"# create light weight components\n",
"load_op = comp.create_component_from_func(load_data,base_image=\"python:3.7.1\")\n",
"transform_op = comp.create_component_from_func(transform_data,base_image=\"python:3.7.1\")\n",
"feature_eng_op = comp.create_component_from_func(feature_engineering,base_image=\"python:3.7.1\")\n",
"modeling_op = comp.create_component_from_func(modeling, base_image=\"python:3.7.1\")\n",
"predict_op = comp.create_component_from_func(prediction, base_image=\"python:3.7.1\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Kubeflow pipeline creation"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"# define pipeline\n",
"@dsl.pipeline(name=\"jpx-tokyo-stock-exchange\", \n",
" description=\"Predicting real future returns of around 2,000 stocks.\")\n",
"\n",
"# Define parameters to be fed into pipeline\n",
"def tokyo_stock_exchange_pipeline(\n",
" dataset: str,\n",
" data_path: str,\n",
" transform_data_path: str, \n",
" feat_eng_data_path: str,\n",
" model_path:str\n",
" ):\n",
"\n",
" vop = dsl.VolumeOp(\n",
" name=\"create_volume\",\n",
" resource_name=\"data-volume\", \n",
" size=\"2Gi\", \n",
" modes=dsl.VOLUME_MODE_RWO)\n",
" \n",
" # Create load container.\n",
" load_container = load_op(dataset).add_pvolumes({\"/mnt\": vop.volume}).add_pod_label(\"kaggle-secret\", \"true\")\n",
" # Create transform container.\n",
" transform_container = transform_op(load_container.output)\n",
" # Create feature engineering container.\n",
" feature_eng_container = feature_eng_op(transform_container.output)\n",
" # Create modeling container.\n",
" modeling_container = modeling_op(feature_eng_container.output)\n",
" # Create prediction container.\n",
" predict_container = predict_op(modeling_container.output)"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"# create client that would enable communication with the Pipelines API server \n",
"client = kfp.Client()"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"# arguments\n",
"dataset = \"jpx-tokyo-stock-exchange-prediction\"\n",
"data_path = \"mnt/data\"\n",
"transform_data_path = \"tdp\"\n",
"feat_eng_data_path = \"feat\"\n",
"model_path = \"model\""
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<a href=\"/pipeline/#/experiments/details/124926a8-cc3d-4726-a356-5169e84ed762\" target=\"_blank\" >Experiment details</a>."
],
"text/plain": [
"<IPython.core.display.HTML object>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
"<a href=\"/pipeline/#/runs/details/0fa09cdf-c976-4887-bcfa-a24b3968e294\" target=\"_blank\" >Run details</a>."
],
"text/plain": [
"<IPython.core.display.HTML object>"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"pipeline_func = tokyo_stock_exchange_pipeline\n",
"\n",
"experiment_name = 'tokyo_stock_exchange_pipeline_lightweight'\n",
"run_name = pipeline_func.__name__ + ' run1'\n",
"\n",
"arguments = {\n",
" \"dataset\": dataset,\n",
" \"data_path\": data_path,\n",
" \"transform_data_path\": transform_data_path,\n",
" \"feat_eng_data_path\": feat_eng_data_path,\n",
" \"model_path\":model_path\n",
" }\n",
"\n",
"# Compile pipeline to generate compressed YAML definition of the pipeline.\n",
"kfp.compiler.Compiler().compile(pipeline_func, \n",
" '{}.zip'.format(experiment_name))\n",
"\n",
"# Submit pipeline directly from pipeline function\n",
"run_result = client.create_run_from_pipeline_func(pipeline_func, \n",
" experiment_name=experiment_name, \n",
" run_name=run_name, \n",
" arguments=arguments\n",
" )\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"kubeflow_notebook": {
"autosnapshot": true,
"experiment": {
"id": "new",
"name": "jpx-tokyo-stock-exchange"
},
"experiment_name": "jpx-tokyo-stock-exchange",
"katib_metadata": {
"algorithm": {
"algorithmName": "grid"
},
"maxFailedTrialCount": 3,
"maxTrialCount": 12,
"objective": {
"objectiveMetricName": "",
"type": "minimize"
},
"parallelTrialCount": 3,
"parameters": []
},
"katib_run": false,
"pipeline_description": "JPX Tokyo Stock Exchange Prediction",
"pipeline_name": "jpx-tokyo-stock-exchange-pipeline",
"snapshot_volumes": true,
"steps_defaults": [
"label:access-ml-pipeline:true",
"label:kaggle-secret:true",
"label:access-rok:true"
],
"volume_access_mode": "rwm",
"volumes": [
{
"annotations": [],
"mount_point": "/home/jovyan",
"name": "dem-workspace-snqdc",
"size": 5,
"size_type": "Gi",
"snapshot": false,
"type": "clone"
}
]
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.9"
},
"papermill": {
"default_parameters": {},
"duration": 32.012084,
"end_time": "2022-04-17T07:17:25.053666",
"environment_variables": {},
"exception": null,
"input_path": "__notebook__.ipynb",
"output_path": "__notebook__.ipynb",
"parameters": {},
"start_time": "2022-04-17T07:16:53.041582",
"version": "2.3.4"
}
},
"nbformat": 4,
"nbformat_minor": 5
}