502 lines
16 KiB
Python
502 lines
16 KiB
Python
from kfp import dsl
|
|
from kfp import compiler
|
|
from kfp.dsl import (Input, Output, Dataset, Model, Metrics, ClassificationMetrics)
|
|
|
|
@dsl.component(
|
|
base_image="python:3.10",
|
|
packages_to_install=["numpy", "pandas", "loguru"])
|
|
def load_data(
|
|
data_url: str,
|
|
data_size: int,
|
|
credit_risk_dataset: Output[Dataset]):
|
|
|
|
'''
|
|
Downloads credit_risk_dataset.csv file and generates
|
|
additional synthetic data for benchmarking and testing purposes.
|
|
|
|
Input Parameters
|
|
----------------
|
|
data_url : str
|
|
url where the dataset is hosted
|
|
data_size : int
|
|
size of final dataset desired, default 1M rows
|
|
|
|
Output Artifacts
|
|
----------------
|
|
credit_risk_dataset : Dataset
|
|
data that has been synthetically augmented or loaded from URL provided
|
|
'''
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
from loguru import logger
|
|
|
|
logger.info("Loading csv from {}", data_url)
|
|
data = pd.read_csv(data_url)
|
|
logger.info("Done!")
|
|
|
|
# number of rows to generate
|
|
if data_size < data.shape[0]:
|
|
pass
|
|
else:
|
|
logger.info("Generating {:,} rows of data...", data_size)
|
|
repeats = data_size // len(data)
|
|
data = data.loc[np.repeat(data.index.values, repeats + 1)]
|
|
data = data.iloc[:data_size]
|
|
|
|
# perturbing all int/float columns
|
|
person_age = data["person_age"].values + np.random.randint(
|
|
-1, 1, size=len(data)
|
|
)
|
|
person_income = data["person_income"].values + np.random.normal(
|
|
0, 10, size=len(data)
|
|
)
|
|
person_emp_length = data[
|
|
"person_emp_length"
|
|
].values + np.random.randint(-1, 1, size=len(data))
|
|
loan_amnt = data["loan_amnt"].values + np.random.normal(
|
|
0, 5, size=len(data)
|
|
)
|
|
loan_int_rate = data["loan_int_rate"].values + np.random.normal(
|
|
0, 0.2, size=len(data)
|
|
)
|
|
loan_percent_income = data["loan_percent_income"].values + (
|
|
np.random.randint(0, 100, size=len(data)) / 1000
|
|
)
|
|
cb_person_cred_hist_length = data[
|
|
"cb_person_cred_hist_length"
|
|
].values + np.random.randint(0, 2, size=len(data))
|
|
|
|
# perturbing all binary columns
|
|
perturb_idx = np.random.rand(len(data)) > 0.1
|
|
random_values = np.random.choice(
|
|
data["person_home_ownership"].unique(), len(data)
|
|
)
|
|
person_home_ownership = np.where(
|
|
perturb_idx, data["person_home_ownership"], random_values
|
|
)
|
|
perturb_idx = np.random.rand(len(data)) > 0.1
|
|
random_values = np.random.choice(
|
|
data["loan_intent"].unique(), len(data)
|
|
)
|
|
loan_intent = np.where(perturb_idx, data["loan_intent"], random_values)
|
|
perturb_idx = np.random.rand(len(data)) > 0.1
|
|
random_values = np.random.choice(
|
|
data["loan_grade"].unique(), len(data)
|
|
)
|
|
loan_grade = np.where(perturb_idx, data["loan_grade"], random_values)
|
|
perturb_idx = np.random.rand(len(data)) > 0.1
|
|
random_values = np.random.choice(
|
|
data["cb_person_default_on_file"].unique(), len(data)
|
|
)
|
|
cb_person_default_on_file = np.where(
|
|
perturb_idx, data["cb_person_default_on_file"], random_values
|
|
)
|
|
data = pd.DataFrame(
|
|
list(
|
|
zip(
|
|
person_age,
|
|
person_income,
|
|
person_home_ownership,
|
|
person_emp_length,
|
|
loan_intent,
|
|
loan_grade,
|
|
loan_amnt,
|
|
loan_int_rate,
|
|
data["loan_status"].values,
|
|
loan_percent_income,
|
|
cb_person_default_on_file,
|
|
cb_person_cred_hist_length,
|
|
)
|
|
),
|
|
columns = data.columns,
|
|
)
|
|
|
|
data = data.drop_duplicates()
|
|
assert len(data) == data_size
|
|
data.reset_index(drop = True)
|
|
|
|
data.to_csv(credit_risk_dataset.path, index = None)
|
|
|
|
@dsl.component(
|
|
base_image="python:3.10",
|
|
packages_to_install=["pandas", "scikit-learn", "loguru"])
|
|
def create_train_test_set(
|
|
data: Input[Dataset],
|
|
x_train_data: Output[Dataset],
|
|
y_train_data: Output[Dataset],
|
|
x_test_data: Output[Dataset],
|
|
y_test_data: Output[Dataset]):
|
|
|
|
'''
|
|
Creates 75:25 split of input dataset for model evaluation.
|
|
|
|
Input Artifacts
|
|
---------------
|
|
data : Dataset
|
|
dataset that has been synthetically augmented by the load_data() function
|
|
|
|
Output Artifacts
|
|
----------------
|
|
x_train_data : Dataset
|
|
training features, 75% of original dataset
|
|
y_train_data : Dataset
|
|
training labels of target variable, loan_status
|
|
x_test_data : Dataset
|
|
test features, 25% of original dataset
|
|
y_test_data : Dataset
|
|
test labels of target variable, loan_status
|
|
'''
|
|
|
|
import pandas as pd
|
|
from loguru import logger
|
|
from sklearn.model_selection import train_test_split
|
|
|
|
data = pd.read_csv(data.path)
|
|
|
|
logger.info("Creating training and test sets...")
|
|
train, test = train_test_split(data, test_size = 0.25, random_state = 0)
|
|
|
|
X_train = train.drop(["loan_status"], axis = 1)
|
|
y_train = train["loan_status"]
|
|
|
|
X_test = test.drop(["loan_status"], axis = 1)
|
|
y_test = test["loan_status"]
|
|
|
|
logger.info("Training and test sets created.\n" \
|
|
"X_train size: {}, y_train size: {}\n" \
|
|
"X_test size: {}, y_test size: {}",
|
|
X_train.shape, y_train.shape, X_test.shape, y_test.shape)
|
|
|
|
X_train.to_csv(x_train_data.path, index = False)
|
|
y_train.to_csv(y_train_data.path, index = False, header = None)
|
|
X_test.to_csv(x_test_data.path, index = False)
|
|
y_test.to_csv(y_test_data.path, index = False, header = None)
|
|
|
|
@dsl.component(
|
|
base_image="python:3.10",
|
|
packages_to_install=["pandas", "scikit-learn"])
|
|
def preprocess_features(
|
|
x_train: Input[Dataset],
|
|
x_test: Input[Dataset],
|
|
x_train_processed: Output[Dataset],
|
|
x_test_processed: Output[Dataset]):
|
|
|
|
'''
|
|
Performs data preprocessing of training and test features.
|
|
|
|
Input Artifacts
|
|
---------------
|
|
x_train : Dataset
|
|
original unprocessed training features
|
|
x_test : Dataset
|
|
original unprocessed test features
|
|
|
|
Output Artifacts
|
|
----------------
|
|
x_train_processed : Dataset
|
|
processed and scaled training features
|
|
x_test_processed : Dataset
|
|
processed and scaled test features
|
|
'''
|
|
|
|
import pandas as pd
|
|
from sklearn.compose import ColumnTransformer
|
|
from sklearn.impute import SimpleImputer
|
|
from sklearn.pipeline import Pipeline
|
|
from sklearn.preprocessing import OneHotEncoder, PowerTransformer
|
|
|
|
X_train = pd.read_csv(x_train.path)
|
|
X_test = pd.read_csv(x_test.path)
|
|
|
|
# data processing pipeline
|
|
num_imputer = Pipeline(steps=[("imputer", SimpleImputer(strategy = "median"))])
|
|
pow_transformer = PowerTransformer()
|
|
cat_transformer = OneHotEncoder(handle_unknown = "ignore")
|
|
preprocessor = ColumnTransformer(
|
|
transformers = [
|
|
(
|
|
"num",
|
|
num_imputer,
|
|
[
|
|
"loan_int_rate",
|
|
"person_emp_length",
|
|
"cb_person_cred_hist_length",
|
|
],
|
|
),
|
|
(
|
|
"pow",
|
|
pow_transformer,
|
|
["person_age", "person_income", "loan_amnt", "loan_percent_income"],
|
|
),
|
|
(
|
|
"cat",
|
|
cat_transformer,
|
|
[
|
|
"person_home_ownership",
|
|
"loan_intent",
|
|
"loan_grade",
|
|
"cb_person_default_on_file",
|
|
],
|
|
),
|
|
],
|
|
remainder="passthrough",
|
|
)
|
|
|
|
preprocess = Pipeline(steps = [("preprocessor", preprocessor)])
|
|
|
|
X_train = pd.DataFrame(preprocess.fit_transform(X_train))
|
|
X_test = pd.DataFrame(preprocess.transform(X_test))
|
|
|
|
X_train.to_csv(x_train_processed.path, index = False, header = None)
|
|
X_test.to_csv(x_test_processed.path, index = False, header = None)
|
|
|
|
@dsl.component(
|
|
base_image="python:3.10",
|
|
packages_to_install=["pandas", "xgboost", "joblib", "loguru"])
|
|
def train_xgboost_model(
|
|
x_train: Input[Dataset],
|
|
y_train: Input[Dataset],
|
|
xgb_model: Output[Model]):
|
|
|
|
'''
|
|
Trains an XGBoost classification model.
|
|
|
|
Input Artifacts
|
|
---------------
|
|
x_train : Dataset
|
|
processed and scaled training features
|
|
y_train : Dataset
|
|
training labels of target variable, loan_status
|
|
|
|
Output Artifacts
|
|
----------------
|
|
xgb_model : Model
|
|
trained XGBoost model
|
|
'''
|
|
|
|
import joblib
|
|
import pandas as pd
|
|
import xgboost as xgb
|
|
from loguru import logger
|
|
|
|
X_train = pd.read_csv(x_train.path, header = None)
|
|
y_train = pd.read_csv(y_train.path, header = None)
|
|
|
|
dtrain = xgb.DMatrix(X_train.values, y_train.values)
|
|
|
|
# define model parameters
|
|
params = {
|
|
"objective": "binary:logistic",
|
|
"eval_metric": "logloss",
|
|
"nthread": 4, # num_cpu
|
|
"tree_method": "hist",
|
|
"learning_rate": 0.02,
|
|
"max_depth": 10,
|
|
"min_child_weight": 6,
|
|
"n_jobs": 4, # num_cpu,
|
|
"verbosity": 1
|
|
}
|
|
|
|
# train XGBoost model
|
|
logger.info("Training XGBoost model...")
|
|
clf = xgb.train(params = params,
|
|
dtrain = dtrain,
|
|
num_boost_round = 500)
|
|
|
|
with open(xgb_model.path, "wb") as file_writer:
|
|
joblib.dump(clf, file_writer)
|
|
|
|
@dsl.component(
|
|
base_image="python:3.10",
|
|
packages_to_install=["daal4py", "joblib", "loguru"])
|
|
def convert_xgboost_to_daal4py(
|
|
xgb_model: Input[Model],
|
|
daal4py_model: Output[Model]):
|
|
|
|
'''
|
|
Converts XGBoost model to inference-optimized daal4py classifier.
|
|
|
|
Input Artifacts
|
|
---------------
|
|
xgb_model : Model
|
|
trained XGBoost classifier
|
|
|
|
Output Artifacts
|
|
----------------
|
|
daal4py_model : Model
|
|
inference-optimized daal4py classifier
|
|
'''
|
|
|
|
import daal4py as d4p
|
|
import joblib
|
|
from loguru import logger
|
|
|
|
with open(xgb_model.path, "rb") as file_reader:
|
|
clf = joblib.load(file_reader)
|
|
|
|
logger.info("Converting XGBoost model to Daal4py...")
|
|
daal_model = d4p.get_gbt_model_from_xgboost(clf)
|
|
logger.info("Done!")
|
|
|
|
with open(daal4py_model.path, "wb") as file_writer:
|
|
joblib.dump(daal_model, file_writer)
|
|
|
|
@dsl.component(
|
|
base_image="python:3.10",
|
|
packages_to_install=["daal4py", "pandas", "scikit-learn",
|
|
"scikit-learn-intelex", "joblib"])
|
|
def daal4py_inference(
|
|
x_test: Input[Dataset],
|
|
y_test: Input[Dataset],
|
|
daal4py_model: Input[Model],
|
|
prediction_data: Output[Dataset],
|
|
report: Output[Dataset],
|
|
metrics: Output[Metrics]
|
|
):
|
|
|
|
'''
|
|
Computes predictions using the inference-optimized daal4py classifier
|
|
and evaluates model performance.
|
|
|
|
Input Artifacts
|
|
---------------
|
|
x_test : Dataset
|
|
processed and scaled test features
|
|
y_test : Dataset
|
|
test labels of target variable, loan_status
|
|
daal4py_model : Model
|
|
inference-optimized daal4py classifier
|
|
|
|
Output Artifacts
|
|
----------------
|
|
prediction_data : Dataset
|
|
dataset containing true test labels and predicted probabilities
|
|
report : Dataset
|
|
summary of the precision, recall, F1 score for each class
|
|
metrics : Metrics
|
|
scalar classification metrics containing the model's AUC and accuracy
|
|
'''
|
|
|
|
import daal4py as d4p
|
|
import joblib
|
|
import pandas as pd
|
|
|
|
from sklearnex import patch_sklearn
|
|
patch_sklearn()
|
|
from sklearn.metrics import roc_auc_score, accuracy_score, classification_report
|
|
|
|
X_test = pd.read_csv(x_test.path, header = None)
|
|
y_test = pd.read_csv(y_test.path, header = None)
|
|
|
|
with open(daal4py_model.path, "rb") as file_reader:
|
|
daal_model = joblib.load(file_reader)
|
|
|
|
daal_prediction = d4p.gbt_classification_prediction(
|
|
nClasses = 2,
|
|
resultsToEvaluate = "computeClassLabels|computeClassProbabilities"
|
|
).compute(X_test, daal_model)
|
|
|
|
y_pred = daal_prediction.prediction
|
|
y_prob = daal_prediction.probabilities[:,1]
|
|
|
|
results = classification_report(
|
|
y_test, y_pred,
|
|
target_names = ["Non-Default", "Default"],
|
|
output_dict = True
|
|
)
|
|
results = pd.DataFrame(results).transpose()
|
|
results.to_csv(report.path)
|
|
|
|
auc = roc_auc_score(y_test, y_prob)
|
|
metrics.log_metric('AUC', auc)
|
|
|
|
accuracy = (accuracy_score(y_test, y_pred)*100)
|
|
metrics.log_metric('Accuracy', accuracy)
|
|
|
|
predictions = pd.DataFrame({'y_test': y_test.values.flatten(),
|
|
'y_prob': y_prob})
|
|
predictions.to_csv(prediction_data.path, index = False)
|
|
|
|
@dsl.component(
|
|
base_image="python:3.10",
|
|
packages_to_install=["numpy", "pandas", "scikit-learn",
|
|
"scikit-learn-intelex"])
|
|
def plot_roc_curve(
|
|
predictions: Input[Dataset],
|
|
class_metrics: Output[ClassificationMetrics]
|
|
):
|
|
|
|
'''
|
|
Function to plot Receiver Operating Characteristic (ROC) curve.
|
|
|
|
Input Artifacts
|
|
---------------
|
|
predictions : Dataset
|
|
dataset containing true test labels and predicted probabilities
|
|
|
|
Output Artifacts
|
|
----------------
|
|
class_metrics : ClassificationMetrics
|
|
classification metrics containing fpr, tpr, and thresholds
|
|
'''
|
|
|
|
import pandas as pd
|
|
from numpy import inf
|
|
|
|
from sklearnex import patch_sklearn
|
|
patch_sklearn()
|
|
from sklearn.metrics import roc_curve
|
|
|
|
prediction_data = pd.read_csv(predictions.path)
|
|
|
|
fpr, tpr, thresholds = roc_curve(
|
|
y_true = prediction_data['y_test'],
|
|
y_score = prediction_data['y_prob'],
|
|
pos_label = 1)
|
|
thresholds[thresholds == inf] = 0
|
|
|
|
class_metrics.log_roc_curve(fpr, tpr, thresholds)
|
|
|
|
@dsl.pipeline
|
|
def intel_xgboost_daal4py_pipeline(
|
|
data_url: str,
|
|
data_size: int):
|
|
|
|
load_data_op = load_data(
|
|
data_url = data_url, data_size = data_size
|
|
)
|
|
|
|
create_train_test_set_op = create_train_test_set(
|
|
data = load_data_op.outputs['credit_risk_dataset']
|
|
)
|
|
|
|
preprocess_features_op = preprocess_features(
|
|
x_train = create_train_test_set_op.outputs['x_train_data'],
|
|
x_test = create_train_test_set_op.outputs['x_test_data']
|
|
)
|
|
|
|
train_xgboost_model_op = train_xgboost_model(
|
|
x_train = preprocess_features_op.outputs['x_train_processed'],
|
|
y_train = create_train_test_set_op.outputs['y_train_data']
|
|
)
|
|
|
|
convert_xgboost_to_daal4py_op = convert_xgboost_to_daal4py(
|
|
xgb_model = train_xgboost_model_op.outputs['xgb_model']
|
|
)
|
|
|
|
daal4py_inference_op = daal4py_inference(
|
|
x_test = preprocess_features_op.outputs['x_test_processed'],
|
|
y_test = create_train_test_set_op.outputs['y_test_data'],
|
|
daal4py_model = convert_xgboost_to_daal4py_op.outputs['daal4py_model']
|
|
)
|
|
|
|
plot_roc_curve_op = plot_roc_curve(
|
|
predictions = daal4py_inference_op.outputs['prediction_data']
|
|
)
|
|
|
|
if __name__ == '__main__':
|
|
# Compiling the pipeline
|
|
compiler.Compiler().compile(
|
|
pipeline_func = intel_xgboost_daal4py_pipeline,
|
|
package_path = 'intel-xgboost-daal4py-pipeline.yaml') |