pipelines/samples/contrib/intel-oneapi-samples/intel_xgboost_daal4py_pipel...

502 lines
16 KiB
Python

from kfp import dsl
from kfp import compiler
from kfp.dsl import (Input, Output, Dataset, Model, Metrics, ClassificationMetrics)
@dsl.component(
base_image="python:3.10",
packages_to_install=["numpy", "pandas", "loguru"])
def load_data(
data_url: str,
data_size: int,
credit_risk_dataset: Output[Dataset]):
'''
Downloads credit_risk_dataset.csv file and generates
additional synthetic data for benchmarking and testing purposes.
Input Parameters
----------------
data_url : str
url where the dataset is hosted
data_size : int
size of final dataset desired, default 1M rows
Output Artifacts
----------------
credit_risk_dataset : Dataset
data that has been synthetically augmented or loaded from URL provided
'''
import numpy as np
import pandas as pd
from loguru import logger
logger.info("Loading csv from {}", data_url)
data = pd.read_csv(data_url)
logger.info("Done!")
# number of rows to generate
if data_size < data.shape[0]:
pass
else:
logger.info("Generating {:,} rows of data...", data_size)
repeats = data_size // len(data)
data = data.loc[np.repeat(data.index.values, repeats + 1)]
data = data.iloc[:data_size]
# perturbing all int/float columns
person_age = data["person_age"].values + np.random.randint(
-1, 1, size=len(data)
)
person_income = data["person_income"].values + np.random.normal(
0, 10, size=len(data)
)
person_emp_length = data[
"person_emp_length"
].values + np.random.randint(-1, 1, size=len(data))
loan_amnt = data["loan_amnt"].values + np.random.normal(
0, 5, size=len(data)
)
loan_int_rate = data["loan_int_rate"].values + np.random.normal(
0, 0.2, size=len(data)
)
loan_percent_income = data["loan_percent_income"].values + (
np.random.randint(0, 100, size=len(data)) / 1000
)
cb_person_cred_hist_length = data[
"cb_person_cred_hist_length"
].values + np.random.randint(0, 2, size=len(data))
# perturbing all binary columns
perturb_idx = np.random.rand(len(data)) > 0.1
random_values = np.random.choice(
data["person_home_ownership"].unique(), len(data)
)
person_home_ownership = np.where(
perturb_idx, data["person_home_ownership"], random_values
)
perturb_idx = np.random.rand(len(data)) > 0.1
random_values = np.random.choice(
data["loan_intent"].unique(), len(data)
)
loan_intent = np.where(perturb_idx, data["loan_intent"], random_values)
perturb_idx = np.random.rand(len(data)) > 0.1
random_values = np.random.choice(
data["loan_grade"].unique(), len(data)
)
loan_grade = np.where(perturb_idx, data["loan_grade"], random_values)
perturb_idx = np.random.rand(len(data)) > 0.1
random_values = np.random.choice(
data["cb_person_default_on_file"].unique(), len(data)
)
cb_person_default_on_file = np.where(
perturb_idx, data["cb_person_default_on_file"], random_values
)
data = pd.DataFrame(
list(
zip(
person_age,
person_income,
person_home_ownership,
person_emp_length,
loan_intent,
loan_grade,
loan_amnt,
loan_int_rate,
data["loan_status"].values,
loan_percent_income,
cb_person_default_on_file,
cb_person_cred_hist_length,
)
),
columns = data.columns,
)
data = data.drop_duplicates()
assert len(data) == data_size
data.reset_index(drop = True)
data.to_csv(credit_risk_dataset.path, index = None)
@dsl.component(
base_image="python:3.10",
packages_to_install=["pandas", "scikit-learn", "loguru"])
def create_train_test_set(
data: Input[Dataset],
x_train_data: Output[Dataset],
y_train_data: Output[Dataset],
x_test_data: Output[Dataset],
y_test_data: Output[Dataset]):
'''
Creates 75:25 split of input dataset for model evaluation.
Input Artifacts
---------------
data : Dataset
dataset that has been synthetically augmented by the load_data() function
Output Artifacts
----------------
x_train_data : Dataset
training features, 75% of original dataset
y_train_data : Dataset
training labels of target variable, loan_status
x_test_data : Dataset
test features, 25% of original dataset
y_test_data : Dataset
test labels of target variable, loan_status
'''
import pandas as pd
from loguru import logger
from sklearn.model_selection import train_test_split
data = pd.read_csv(data.path)
logger.info("Creating training and test sets...")
train, test = train_test_split(data, test_size = 0.25, random_state = 0)
X_train = train.drop(["loan_status"], axis = 1)
y_train = train["loan_status"]
X_test = test.drop(["loan_status"], axis = 1)
y_test = test["loan_status"]
logger.info("Training and test sets created.\n" \
"X_train size: {}, y_train size: {}\n" \
"X_test size: {}, y_test size: {}",
X_train.shape, y_train.shape, X_test.shape, y_test.shape)
X_train.to_csv(x_train_data.path, index = False)
y_train.to_csv(y_train_data.path, index = False, header = None)
X_test.to_csv(x_test_data.path, index = False)
y_test.to_csv(y_test_data.path, index = False, header = None)
@dsl.component(
base_image="python:3.10",
packages_to_install=["pandas", "scikit-learn"])
def preprocess_features(
x_train: Input[Dataset],
x_test: Input[Dataset],
x_train_processed: Output[Dataset],
x_test_processed: Output[Dataset]):
'''
Performs data preprocessing of training and test features.
Input Artifacts
---------------
x_train : Dataset
original unprocessed training features
x_test : Dataset
original unprocessed test features
Output Artifacts
----------------
x_train_processed : Dataset
processed and scaled training features
x_test_processed : Dataset
processed and scaled test features
'''
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, PowerTransformer
X_train = pd.read_csv(x_train.path)
X_test = pd.read_csv(x_test.path)
# data processing pipeline
num_imputer = Pipeline(steps=[("imputer", SimpleImputer(strategy = "median"))])
pow_transformer = PowerTransformer()
cat_transformer = OneHotEncoder(handle_unknown = "ignore")
preprocessor = ColumnTransformer(
transformers = [
(
"num",
num_imputer,
[
"loan_int_rate",
"person_emp_length",
"cb_person_cred_hist_length",
],
),
(
"pow",
pow_transformer,
["person_age", "person_income", "loan_amnt", "loan_percent_income"],
),
(
"cat",
cat_transformer,
[
"person_home_ownership",
"loan_intent",
"loan_grade",
"cb_person_default_on_file",
],
),
],
remainder="passthrough",
)
preprocess = Pipeline(steps = [("preprocessor", preprocessor)])
X_train = pd.DataFrame(preprocess.fit_transform(X_train))
X_test = pd.DataFrame(preprocess.transform(X_test))
X_train.to_csv(x_train_processed.path, index = False, header = None)
X_test.to_csv(x_test_processed.path, index = False, header = None)
@dsl.component(
base_image="python:3.10",
packages_to_install=["pandas", "xgboost", "joblib", "loguru"])
def train_xgboost_model(
x_train: Input[Dataset],
y_train: Input[Dataset],
xgb_model: Output[Model]):
'''
Trains an XGBoost classification model.
Input Artifacts
---------------
x_train : Dataset
processed and scaled training features
y_train : Dataset
training labels of target variable, loan_status
Output Artifacts
----------------
xgb_model : Model
trained XGBoost model
'''
import joblib
import pandas as pd
import xgboost as xgb
from loguru import logger
X_train = pd.read_csv(x_train.path, header = None)
y_train = pd.read_csv(y_train.path, header = None)
dtrain = xgb.DMatrix(X_train.values, y_train.values)
# define model parameters
params = {
"objective": "binary:logistic",
"eval_metric": "logloss",
"nthread": 4, # num_cpu
"tree_method": "hist",
"learning_rate": 0.02,
"max_depth": 10,
"min_child_weight": 6,
"n_jobs": 4, # num_cpu,
"verbosity": 1
}
# train XGBoost model
logger.info("Training XGBoost model...")
clf = xgb.train(params = params,
dtrain = dtrain,
num_boost_round = 500)
with open(xgb_model.path, "wb") as file_writer:
joblib.dump(clf, file_writer)
@dsl.component(
base_image="python:3.10",
packages_to_install=["daal4py", "joblib", "loguru"])
def convert_xgboost_to_daal4py(
xgb_model: Input[Model],
daal4py_model: Output[Model]):
'''
Converts XGBoost model to inference-optimized daal4py classifier.
Input Artifacts
---------------
xgb_model : Model
trained XGBoost classifier
Output Artifacts
----------------
daal4py_model : Model
inference-optimized daal4py classifier
'''
import daal4py as d4p
import joblib
from loguru import logger
with open(xgb_model.path, "rb") as file_reader:
clf = joblib.load(file_reader)
logger.info("Converting XGBoost model to Daal4py...")
daal_model = d4p.get_gbt_model_from_xgboost(clf)
logger.info("Done!")
with open(daal4py_model.path, "wb") as file_writer:
joblib.dump(daal_model, file_writer)
@dsl.component(
base_image="python:3.10",
packages_to_install=["daal4py", "pandas", "scikit-learn",
"scikit-learn-intelex", "joblib"])
def daal4py_inference(
x_test: Input[Dataset],
y_test: Input[Dataset],
daal4py_model: Input[Model],
prediction_data: Output[Dataset],
report: Output[Dataset],
metrics: Output[Metrics]
):
'''
Computes predictions using the inference-optimized daal4py classifier
and evaluates model performance.
Input Artifacts
---------------
x_test : Dataset
processed and scaled test features
y_test : Dataset
test labels of target variable, loan_status
daal4py_model : Model
inference-optimized daal4py classifier
Output Artifacts
----------------
prediction_data : Dataset
dataset containing true test labels and predicted probabilities
report : Dataset
summary of the precision, recall, F1 score for each class
metrics : Metrics
scalar classification metrics containing the model's AUC and accuracy
'''
import daal4py as d4p
import joblib
import pandas as pd
from sklearnex import patch_sklearn
patch_sklearn()
from sklearn.metrics import roc_auc_score, accuracy_score, classification_report
X_test = pd.read_csv(x_test.path, header = None)
y_test = pd.read_csv(y_test.path, header = None)
with open(daal4py_model.path, "rb") as file_reader:
daal_model = joblib.load(file_reader)
daal_prediction = d4p.gbt_classification_prediction(
nClasses = 2,
resultsToEvaluate = "computeClassLabels|computeClassProbabilities"
).compute(X_test, daal_model)
y_pred = daal_prediction.prediction
y_prob = daal_prediction.probabilities[:,1]
results = classification_report(
y_test, y_pred,
target_names = ["Non-Default", "Default"],
output_dict = True
)
results = pd.DataFrame(results).transpose()
results.to_csv(report.path)
auc = roc_auc_score(y_test, y_prob)
metrics.log_metric('AUC', auc)
accuracy = (accuracy_score(y_test, y_pred)*100)
metrics.log_metric('Accuracy', accuracy)
predictions = pd.DataFrame({'y_test': y_test.values.flatten(),
'y_prob': y_prob})
predictions.to_csv(prediction_data.path, index = False)
@dsl.component(
base_image="python:3.10",
packages_to_install=["numpy", "pandas", "scikit-learn",
"scikit-learn-intelex"])
def plot_roc_curve(
predictions: Input[Dataset],
class_metrics: Output[ClassificationMetrics]
):
'''
Function to plot Receiver Operating Characteristic (ROC) curve.
Input Artifacts
---------------
predictions : Dataset
dataset containing true test labels and predicted probabilities
Output Artifacts
----------------
class_metrics : ClassificationMetrics
classification metrics containing fpr, tpr, and thresholds
'''
import pandas as pd
from numpy import inf
from sklearnex import patch_sklearn
patch_sklearn()
from sklearn.metrics import roc_curve
prediction_data = pd.read_csv(predictions.path)
fpr, tpr, thresholds = roc_curve(
y_true = prediction_data['y_test'],
y_score = prediction_data['y_prob'],
pos_label = 1)
thresholds[thresholds == inf] = 0
class_metrics.log_roc_curve(fpr, tpr, thresholds)
@dsl.pipeline
def intel_xgboost_daal4py_pipeline(
data_url: str,
data_size: int):
load_data_op = load_data(
data_url = data_url, data_size = data_size
)
create_train_test_set_op = create_train_test_set(
data = load_data_op.outputs['credit_risk_dataset']
)
preprocess_features_op = preprocess_features(
x_train = create_train_test_set_op.outputs['x_train_data'],
x_test = create_train_test_set_op.outputs['x_test_data']
)
train_xgboost_model_op = train_xgboost_model(
x_train = preprocess_features_op.outputs['x_train_processed'],
y_train = create_train_test_set_op.outputs['y_train_data']
)
convert_xgboost_to_daal4py_op = convert_xgboost_to_daal4py(
xgb_model = train_xgboost_model_op.outputs['xgb_model']
)
daal4py_inference_op = daal4py_inference(
x_test = preprocess_features_op.outputs['x_test_processed'],
y_test = create_train_test_set_op.outputs['y_test_data'],
daal4py_model = convert_xgboost_to_daal4py_op.outputs['daal4py_model']
)
plot_roc_curve_op = plot_roc_curve(
predictions = daal4py_inference_op.outputs['prediction_data']
)
if __name__ == '__main__':
# Compiling the pipeline
compiler.Compiler().compile(
pipeline_func = intel_xgboost_daal4py_pipeline,
package_path = 'intel-xgboost-daal4py-pipeline.yaml')